Skip to content
This repository has been archived by the owner on Jun 20, 2024. It is now read-only.

Commit

Permalink
Add support for empty tasks (#360)
Browse files Browse the repository at this point in the history
  • Loading branch information
nh13 authored Aug 22, 2019
1 parent 0c5364a commit 2b5adf6
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 26 deletions.
59 changes: 36 additions & 23 deletions core/src/main/scala/dagr/core/execsystem/TaskManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de
if (updateNodeToCompleted) {
logger.debug("processCompletedTask: Task [" + taskInfo.task.name + "] updating node to completed")
if (TaskStatus.isTaskNotDone(taskInfo.status, failedIsDone = true)) {
throw new RuntimeException("Processing a completed task but it was not done!")
throw new RuntimeException(s"Processing a completed task but it was not done! status: ${taskInfo.status}")
}
completeGraphNode(node, Some(taskInfo))
}
Expand All @@ -313,10 +313,17 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de
*/
private def updateCompletedTasks(): Map[TaskId, (Int, Boolean)] = {
val completedTasks: Map[TaskId, (Int, Boolean)] = taskExecutionRunner.completedTasks()
completedTasks.keys.foreach(taskId => processCompletedTask(taskId))
logger.debug("updateCompletedTasks: found " + completedTasks.size + " completed tasks")
for (taskId <- completedTasks.keys) {
val name = this(taskId).task.name
val emptyTasks = graphNodesInStateFor(GraphNodeState.NO_PREDECESSORS).filter(_.task.isInstanceOf[Task.EmptyTask]).toSeq
val completedTaskIds = completedTasks.keys ++ emptyTasks.map(_.taskId)

emptyTasks.foreach { node =>
node.taskInfo.status = TaskStatus.SUCCEEDED
logger.debug("updateCompletedTasks: empty task [" + node.task.name + "] completed")
}

completedTaskIds.foreach { taskId =>
processCompletedTask(taskId)
val name = this(taskId).task.name
val status = this(taskId).taskInfo.status
logger.debug("updateCompletedTasks: task [" + name + "] completed with task status [" + status + "]")
}
Expand Down Expand Up @@ -358,6 +365,7 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de
logger.debug("updatePredecessors: examining task [" + node.task.name + "] for predecessors: " + node.hasPredecessor)
// - if this node has already been expanded and now has no predecessors, then move it to the next state.
// - if it hasn't been expanded and now has no predecessors, it should get expanded later
if (!node.hasPredecessor) logger.debug(s"updatePredecessors: has node state: ${node.state}")
if (!node.hasPredecessor && node.state == ONLY_PREDECESSORS) {
val taskInfo = node.taskInfo
node.task match {
Expand Down Expand Up @@ -390,15 +398,18 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de
private def invokeGetTasks(node: GraphNode): Boolean = {

// get the list of tasks that this task generates
val tasks = try { node.task.getTasks.toList } catch { case e: Exception => throw new TaskException(e, TaskStatus.FAILED_GET_TASKS) }
logger.debug(f"0 ${node.task.name} : ${tasks.map(_.name).mkString(", ")}")
val tasks = try { node.task.getTasks.toSeq } catch { case e: Exception => throw new TaskException(e, TaskStatus.FAILED_GET_TASKS) }
// NB: we don't create a new node for this task if it just returns itself
// NB: always check for cycles, since we don't know when they could be introduced. We will check
// for cycles in [[addTask]] so only check here if [[getTasks]] returns itself.
tasks match {
case Nil => // no tasks returned
throw new IllegalStateException(s"No tasks to schedule for task: [${node.task.name}]")
logger.debug(f"invokeGetTasks 1 ${node.task.name} : ${tasks.map(_.name).mkString(", ")}")
// set the submission time stamp
node.taskInfo.submissionDate = Some(Instant.now())
false
case x :: Nil if x == node.task => // one task and it returned itself
logger.debug(f"invokeGetTasks 2 ${node.task.name} : ${tasks.map(_.name).mkString(", ")}")
// check for cycles only when we have a unit task for which calling [[getTasks] returns itself.
checkForCycles(task = node.task)
// verify we have a UnitTask
Expand All @@ -408,13 +419,13 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de
}
false
case _ =>
logger.debug(f"3 ${node.task.name} : ${tasks.map(_.name).mkString(", ")}")
logger.debug(f"invokeGetTasks 3 ${node.task.name} : ${tasks.map(_.name).mkString(", ")}")
// set the submission time stamp
node.taskInfo.submissionDate = Some(Instant.now())
// we will make this task dependent on the tasks it creates...
if (tasks.contains(node.task)) throw new IllegalStateException(s"Task [${node.task.name}] contained itself in the list returned by getTasks")
// track the new tasks. If they are already added, that's fine too.
val taskIds: List[TaskId] = tasks.map { task => addTask(task = task, enclosingNode = Some(node), ignoreExists = true) }
val taskIds: Seq[TaskId] = tasks.map { task => addTask(task = task, enclosingNode = Some(node), ignoreExists = true) }
// make this node dependent on those tasks
taskIds.map(taskId => node.addPredecessors(this(taskId)))
// we may need to update precedessors if a returned task was already completed
Expand Down Expand Up @@ -442,11 +453,9 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de
node.state = GraphNodeState.ONLY_PREDECESSORS
taskInfo.status = TaskStatus.STARTED
}
else { // must have predecessors, since `getTasks` did not return itself, and therefore we made this task dependent on others.
throw new IllegalStateException(
"Updating a non-UnitTask's state and status, but could not find any predecessors. " +
"Were tasks returned by its get tasks? " +
s"Task: [${node.task.name}]")
else { // if `getTasks` returned no tasks, then just update it to succeeded
node.state = NO_PREDECESSORS
taskInfo.status = TaskStatus.STARTED
}
}
}
Expand Down Expand Up @@ -512,12 +521,14 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de
protected[core] def readyTasksList: List[UnitTask] = graphNodesInStateFor(NO_PREDECESSORS).toList.map(node => node.task.asInstanceOf[UnitTask])

override def stepExecution(): (Iterable[Task], Iterable[Task], Iterable[Task], Iterable[Task]) = {
logger.debug("runSchedulerOnce: starting one round of execution")
logger.debug("stepExecution: starting one round of execution")

// get newly completed tasks
val completedTasks = updateCompletedTasks()
val canDoAnything = completedTasks.nonEmpty || taskExecutionRunner.runningTaskIds.isEmpty

logger.debug(s"stepExecution: canDoAnything=$canDoAnything")

if (canDoAnything) {
// check if we now know about the predecessors for orphan tasks.
updateOrphans()
Expand All @@ -533,20 +544,22 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de
val runningTasks: Map[UnitTask, ResourceSet] = runningTasksMap

// get the tasks that are eligible for execution (tasks with no dependents)
val readyTasks: List[UnitTask] = graphNodesInStateFor(NO_PREDECESSORS).toList.map(node => node.task.asInstanceOf[UnitTask])
logger.debug("runSchedulerOnce: found " + readyTasks.size + " readyTasks tasks")
val (emptyTasks: List[Task], readyTasks: List[Task]) = {
graphNodesInStateFor(NO_PREDECESSORS).map(_.task).toList.partition(_.isInstanceOf[Task.EmptyTask])
}
logger.debug(s"stepExecution: found ${readyTasks.size} readyTasks tasks and ${emptyTasks.size} empty tasks")

// get the list of tasks to schedule
val tasksToSchedule: Map[UnitTask, ResourceSet] = if (!canDoAnything) Map.empty else {
val tasks = scheduler.schedule(
runningTasks = runningTasks,
readyTasks = readyTasks,
readyTasks = readyTasks.filter(_.isInstanceOf[UnitTask]).map(_.asInstanceOf[UnitTask]),
systemCores = taskManagerResources.cores,
systemMemory = taskManagerResources.systemMemory,
jvmMemory = taskManagerResources.jvmMemory
)

logger.debug("runSchedulerOnce: scheduling " + tasks.size + " tasks")
logger.debug("stepExecution: scheduling " + tasks.size + " tasks")

// add the given tasks to the task runner with the appropriate (as determined by the scheduler) resources.
scheduleAndRunTasks(tasksToSchedule = tasks)
Expand All @@ -555,13 +568,13 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de
}

// for debugging purposes
logger.debug("runSchedulerOnce: finishing one round of execution")
logger.debug("runSchedulerOnce: found " + runningTasks.size + " running tasks and " + tasksToSchedule.size + " tasks to schedule")
logger.debug("stepExecution: finishing one round of execution")
logger.debug("stepExecution: found " + runningTasks.size + " running tasks and " + tasksToSchedule.size + " tasks to schedule")

(
readyTasks,
tasksToSchedule.keys,
runningTasks.keys,
runningTasks.keys ++ emptyTasks,
completedTasks.keys.map(taskId => this(taskId).task)
)
}
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/dagr/core/tasksystem/Pipeline.scala
Original file line number Diff line number Diff line change
Expand Up @@ -84,4 +84,7 @@ abstract class Pipeline(val outputDirectory: Option[Path] = None,

/** True if we this pipeline is tracking this direct ancestor task, false otherwise. */
def contains(task: Task): Boolean = tasks.contains(task)

/** Builds an empty task for use within this pipeline. */
def emptyTask: Task = Task.empty
}
9 changes: 9 additions & 0 deletions core/src/main/scala/dagr/core/tasksystem/Task.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@ import scala.util.control.Breaks._
/** Utility methods to aid in working with a task. */
object Task {

/** Marker trait for empty tasks. */
sealed trait EmptyTask extends Task

/** A task that does nothing. */
def empty: Task = new EmptyTask {
name = "Task.empty"
override def getTasks: Iterable[_ <: Task] = None
}

/** Helper class for Tarjan's strongly connected components algorithm */
private class TarjanData {
var index: Int = 0
Expand Down
4 changes: 2 additions & 2 deletions core/src/test/scala/dagr/core/cmdline/DagrCoreMainTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

package dagr.core.cmdline

import dagr.core.cmdline.pipelines.PipelineFour
import dagr.core.cmdline.pipelines.{PipelineFour, PipelineBuildFailure}
import dagr.core.tasksystem.{NoOpInJvmTask, Pipeline}
import com.fulcrumgenomics.commons.io.Io
import com.fulcrumgenomics.sopt.util.TermCode
Expand Down Expand Up @@ -119,7 +119,7 @@ class DagrCoreMainTest extends UnitSpec with BeforeAndAfterAll with CaptureSyste
}

it should "print the execution failure upon failure" in {
val (_, _, _, log) = testParse(Array[String](nameOf(classOf[PipelineFour])))
val (_, _, _, log) = testParse(Array[String](nameOf(classOf[PipelineBuildFailure])))
log should include("Elapsed time:")
log should include("dagr failed")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,9 @@ private[cmdline] case class PipelineFour
@clp(description = "", group = classOf[TestGroup], hidden = true)
private[cmdline] case class PipelineWithMutex
(@arg(mutex = Array("another")) var argument: String, @arg(mutex = Array("argument")) var another: String) extends CommandLineTaskTesting // argument should be required

@clp(description = "", group = classOf[TestGroup], hidden = true)
private[cmdline] case class PipelineBuildFailure
(@arg var argument: String = "default", @arg var flag: Boolean = false) extends CommandLineTaskTesting {
override def build(): Unit = throw new IllegalStateException()
}
31 changes: 30 additions & 1 deletion core/src/test/scala/dagr/core/execsystem/TaskManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,39 @@ class TaskManagerTest extends UnitSpec with OptionValues with LazyLogging with B
tryTaskNTimes(taskManager = taskManager, task = task, numTimes = 1, taskIsDoneFinally = true, failedAreCompletedFinally = true)
}

it should "run an empty task" in {
val task = Task.empty
val taskManager: TestTaskManager = getDefaultTaskManager()
taskManager.addTask(task)
val taskMap = taskManager.runToCompletion(true)
taskMap.size shouldBe 1
val taskInfo = taskMap.valueFor(task).get
TaskStatus.isTaskDone(taskInfo.status, failedIsDone = false) shouldBe true
}

it should "run an empty task as part of a pipeline" in {
val pipeline = new Pipeline() {
name = "Pipeline"
override def build(): Unit = {
def newTask(name: String) = new ShellCommand("exit", "0") withName "exit 0" requires ResourceSet.empty withName name
val middle = Task.empty
root ==> newTask("exit0-1") ==> middle ==> newTask("exit0-2")
}
}

val taskManager: TestTaskManager = getDefaultTaskManager()
taskManager.addTask(pipeline)
val taskMap = taskManager.runToCompletion(true)
taskMap.size shouldBe 4
taskMap.foreach { case (_, info) =>
TaskStatus.isTaskDone(info.status, failedIsDone = false) shouldBe true
}
}


it should "run a simple task that fails but we allow it" in {
val task: UnitTask = new ShellCommand("exit", "1") withName "exit 1"
val taskManager: TestTaskManager = getDefaultTaskManager()

tryTaskNTimes(taskManager = taskManager, task = task, numTimes = 1, taskIsDoneFinally = true, failedAreCompletedFinally = true)
}

Expand Down

0 comments on commit 2b5adf6

Please sign in to comment.