From 86e749b6ec753bacadd00dd73d2cff480e055533 Mon Sep 17 00:00:00 2001
From: Nils Homer <nilshomer@gmail.com>
Date: Wed, 21 Aug 2019 21:48:42 -0700
Subject: [PATCH] Add support for empty tasks

---
 .../dagr/core/execsystem/TaskManager.scala    | 59 +++++++++++--------
 .../scala/dagr/core/tasksystem/Pipeline.scala |  3 +
 .../scala/dagr/core/tasksystem/Task.scala     |  9 +++
 .../dagr/core/cmdline/DagrCoreMainTest.scala  |  4 +-
 .../core/cmdline/pipelines/Pipelines.scala    |  6 ++
 .../core/execsystem/TaskManagerTest.scala     | 31 +++++++++-
 6 files changed, 86 insertions(+), 26 deletions(-)

diff --git a/core/src/main/scala/dagr/core/execsystem/TaskManager.scala b/core/src/main/scala/dagr/core/execsystem/TaskManager.scala
index f112bd63..d3046884 100644
--- a/core/src/main/scala/dagr/core/execsystem/TaskManager.scala
+++ b/core/src/main/scala/dagr/core/execsystem/TaskManager.scala
@@ -301,7 +301,7 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de
     if (updateNodeToCompleted) {
       logger.debug("processCompletedTask: Task [" + taskInfo.task.name + "] updating node to completed")
       if (TaskStatus.isTaskNotDone(taskInfo.status, failedIsDone = true)) {
-        throw new RuntimeException("Processing a completed task but it was not done!")
+        throw new RuntimeException(s"Processing a completed task but it was not done! status: ${taskInfo.status}")
       }
       completeGraphNode(node, Some(taskInfo))
     }
@@ -313,10 +313,17 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de
     */
   private def updateCompletedTasks(): Map[TaskId, (Int, Boolean)] = {
     val completedTasks: Map[TaskId, (Int, Boolean)] = taskExecutionRunner.completedTasks()
-    completedTasks.keys.foreach(taskId => processCompletedTask(taskId))
-    logger.debug("updateCompletedTasks: found " + completedTasks.size + " completed tasks")
-    for (taskId <- completedTasks.keys) {
-      val name = this(taskId).task.name
+    val emptyTasks = graphNodesInStateFor(GraphNodeState.NO_PREDECESSORS).filter(_.task.isInstanceOf[Task.EmptyTask]).toSeq
+    val completedTaskIds = completedTasks.keys ++ emptyTasks.map(_.taskId)
+
+    emptyTasks.foreach { node =>
+      node.taskInfo.status = TaskStatus.SUCCEEDED
+      logger.debug("updateCompletedTasks: empty task [" + node.task.name + "] completed")
+    }
+
+    completedTaskIds.foreach { taskId =>
+      processCompletedTask(taskId)
+      val name   = this(taskId).task.name
       val status = this(taskId).taskInfo.status
       logger.debug("updateCompletedTasks: task [" + name + "] completed with task status [" + status + "]")
     }
@@ -358,6 +365,7 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de
       logger.debug("updatePredecessors: examining task [" + node.task.name + "] for predecessors: " + node.hasPredecessor)
       // - if this node has already been expanded and now has no predecessors, then move it to the next state.
       // - if it hasn't been expanded and now has no predecessors, it should get expanded later
+      if (!node.hasPredecessor) logger.debug(s"updatePredecessors: has node state: ${node.state}")
       if (!node.hasPredecessor && node.state == ONLY_PREDECESSORS) {
         val taskInfo = node.taskInfo
         node.task match {
@@ -390,15 +398,18 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de
   private def invokeGetTasks(node: GraphNode): Boolean = {
 
     // get the list of tasks that this task generates
-    val tasks = try { node.task.getTasks.toList } catch { case e: Exception => throw new TaskException(e, TaskStatus.FAILED_GET_TASKS) }
-    logger.debug(f"0 ${node.task.name} : ${tasks.map(_.name).mkString(", ")}")
+    val tasks = try { node.task.getTasks.toSeq } catch { case e: Exception => throw new TaskException(e, TaskStatus.FAILED_GET_TASKS) }
     // NB: we don't create a new node for this task if it just returns itself
     // NB: always check for cycles, since we don't know when they could be introduced.  We will check
     //     for cycles in [[addTask]] so only check here if [[getTasks]] returns itself.
     tasks match {
       case Nil => // no tasks returned
-        throw new IllegalStateException(s"No tasks to schedule for task: [${node.task.name}]")
+        logger.debug(f"invokeGetTasks 1 ${node.task.name} : ${tasks.map(_.name).mkString(", ")}")
+        // set the submission time stamp
+        node.taskInfo.submissionDate = Some(Instant.now())
+        false
       case x :: Nil if x == node.task => // one task and it returned itself
+        logger.debug(f"invokeGetTasks 2 ${node.task.name} : ${tasks.map(_.name).mkString(", ")}")
         // check for cycles only when we have a unit task for which calling [[getTasks] returns itself.
         checkForCycles(task = node.task)
         // verify we have a UnitTask
@@ -408,13 +419,13 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de
         }
         false
       case _ =>
-        logger.debug(f"3 ${node.task.name} : ${tasks.map(_.name).mkString(", ")}")
+        logger.debug(f"invokeGetTasks 3 ${node.task.name} : ${tasks.map(_.name).mkString(", ")}")
         // set the submission time stamp
         node.taskInfo.submissionDate = Some(Instant.now())
         // we will make this task dependent on the tasks it creates...
         if (tasks.contains(node.task)) throw new IllegalStateException(s"Task [${node.task.name}] contained itself in the list returned by getTasks")
         // track the new tasks. If they are already added, that's fine too.
-        val taskIds: List[TaskId] = tasks.map { task => addTask(task = task, enclosingNode = Some(node), ignoreExists = true) }
+        val taskIds: Seq[TaskId] = tasks.map { task => addTask(task = task, enclosingNode = Some(node), ignoreExists = true) }
         // make this node dependent on those tasks
         taskIds.map(taskId => node.addPredecessors(this(taskId)))
         // we may need to update precedessors if a returned task was already completed
@@ -442,11 +453,9 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de
           node.state = GraphNodeState.ONLY_PREDECESSORS
           taskInfo.status = TaskStatus.STARTED
         }
-        else { // must have predecessors, since `getTasks` did not return itself, and therefore we made this task dependent on others.
-          throw new IllegalStateException(
-            "Updating a non-UnitTask's state and status, but could not find any predecessors.  " +
-              "Were tasks returned by its get tasks?  " +
-              s"Task: [${node.task.name}]")
+        else { // if `getTasks` returned no tasks, then just update it to succeeded
+          node.state = NO_PREDECESSORS
+          taskInfo.status = TaskStatus.STARTED
         }
     }
   }
@@ -512,12 +521,14 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de
   protected[core] def readyTasksList: List[UnitTask] = graphNodesInStateFor(NO_PREDECESSORS).toList.map(node => node.task.asInstanceOf[UnitTask])
 
   override def stepExecution(): (Iterable[Task], Iterable[Task], Iterable[Task], Iterable[Task]) = {
-    logger.debug("runSchedulerOnce: starting one round of execution")
+    logger.debug("stepExecution: starting one round of execution")
 
     // get newly completed tasks
     val completedTasks = updateCompletedTasks()
     val canDoAnything  = completedTasks.nonEmpty || taskExecutionRunner.runningTaskIds.isEmpty
 
+    logger.debug(s"stepExecution: canDoAnything=$canDoAnything")
+
     if (canDoAnything) {
       // check if we now know about the predecessors for orphan tasks.
       updateOrphans()
@@ -533,20 +544,22 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de
     val runningTasks: Map[UnitTask, ResourceSet] = runningTasksMap
 
     // get the tasks that are eligible for execution (tasks with no dependents)
-    val readyTasks: List[UnitTask] = graphNodesInStateFor(NO_PREDECESSORS).toList.map(node => node.task.asInstanceOf[UnitTask])
-    logger.debug("runSchedulerOnce: found " + readyTasks.size + " readyTasks tasks")
+    val (emptyTasks: List[Task], readyTasks: List[Task]) = {
+      graphNodesInStateFor(NO_PREDECESSORS).map(_.task).toList.partition(_.isInstanceOf[Task.EmptyTask])
+    }
+    logger.debug(s"stepExecution: found ${readyTasks.size} readyTasks tasks and ${emptyTasks.size} empty tasks")
 
     // get the list of tasks to schedule
     val tasksToSchedule: Map[UnitTask, ResourceSet] = if (!canDoAnything) Map.empty else {
       val tasks = scheduler.schedule(
         runningTasks = runningTasks,
-        readyTasks   = readyTasks,
+        readyTasks   = readyTasks.filter(_.isInstanceOf[UnitTask]).map(_.asInstanceOf[UnitTask]),
         systemCores  = taskManagerResources.cores,
         systemMemory = taskManagerResources.systemMemory,
         jvmMemory    = taskManagerResources.jvmMemory
       )
 
-      logger.debug("runSchedulerOnce: scheduling " + tasks.size + " tasks")
+      logger.debug("stepExecution: scheduling " + tasks.size + " tasks")
 
       // add the given tasks to the task runner with the appropriate (as determined by the scheduler) resources.
       scheduleAndRunTasks(tasksToSchedule = tasks)
@@ -555,13 +568,13 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de
     }
 
     // for debugging purposes
-    logger.debug("runSchedulerOnce: finishing one round of execution")
-    logger.debug("runSchedulerOnce: found " + runningTasks.size + " running tasks and " + tasksToSchedule.size + " tasks to schedule")
+    logger.debug("stepExecution: finishing one round of execution")
+    logger.debug("stepExecution: found " + runningTasks.size + " running tasks and " + tasksToSchedule.size + " tasks to schedule")
 
     (
       readyTasks,
       tasksToSchedule.keys,
-      runningTasks.keys,
+      runningTasks.keys ++ emptyTasks,
       completedTasks.keys.map(taskId => this(taskId).task)
     )
   }
diff --git a/core/src/main/scala/dagr/core/tasksystem/Pipeline.scala b/core/src/main/scala/dagr/core/tasksystem/Pipeline.scala
index 19033df1..910616a8 100644
--- a/core/src/main/scala/dagr/core/tasksystem/Pipeline.scala
+++ b/core/src/main/scala/dagr/core/tasksystem/Pipeline.scala
@@ -84,4 +84,7 @@ abstract class Pipeline(val outputDirectory: Option[Path] = None,
 
   /** True if we this pipeline is tracking this direct ancestor task, false otherwise. */
   def contains(task: Task): Boolean = tasks.contains(task)
+
+  /** Builds an empty task for use within this pipeline. */
+  def emptyTask: Task = Task.empty
 }
diff --git a/core/src/main/scala/dagr/core/tasksystem/Task.scala b/core/src/main/scala/dagr/core/tasksystem/Task.scala
index 459158e9..38402212 100644
--- a/core/src/main/scala/dagr/core/tasksystem/Task.scala
+++ b/core/src/main/scala/dagr/core/tasksystem/Task.scala
@@ -33,6 +33,15 @@ import scala.util.control.Breaks._
 /** Utility methods to aid in working with a task. */
 object Task {
 
+  /** Marker trait for empty tasks. */
+  sealed trait EmptyTask extends Task
+
+  /** A task that does nothing. */
+  def empty: Task = new EmptyTask {
+    name = "Task.empty"
+    override def getTasks: Iterable[_ <: Task] = None
+  }
+
   /** Helper class for Tarjan's strongly connected components algorithm */
   private class TarjanData {
     var index: Int = 0
diff --git a/core/src/test/scala/dagr/core/cmdline/DagrCoreMainTest.scala b/core/src/test/scala/dagr/core/cmdline/DagrCoreMainTest.scala
index 631ff30d..b6a564a5 100644
--- a/core/src/test/scala/dagr/core/cmdline/DagrCoreMainTest.scala
+++ b/core/src/test/scala/dagr/core/cmdline/DagrCoreMainTest.scala
@@ -24,7 +24,7 @@
 
 package dagr.core.cmdline
 
-import dagr.core.cmdline.pipelines.PipelineFour
+import dagr.core.cmdline.pipelines.{PipelineFour, PipelineBuildFailure}
 import dagr.core.tasksystem.{NoOpInJvmTask, Pipeline}
 import com.fulcrumgenomics.commons.io.Io
 import com.fulcrumgenomics.sopt.util.TermCode
@@ -119,7 +119,7 @@ class DagrCoreMainTest extends UnitSpec with BeforeAndAfterAll with CaptureSyste
   }
 
   it should "print the execution failure upon failure" in {
-    val (_, _, _, log) = testParse(Array[String](nameOf(classOf[PipelineFour])))
+    val (_, _, _, log) = testParse(Array[String](nameOf(classOf[PipelineBuildFailure])))
     log should include("Elapsed time:")
     log should include("dagr failed")
   }
diff --git a/core/src/test/scala/dagr/core/cmdline/pipelines/Pipelines.scala b/core/src/test/scala/dagr/core/cmdline/pipelines/Pipelines.scala
index 1531827e..a5896f3f 100644
--- a/core/src/test/scala/dagr/core/cmdline/pipelines/Pipelines.scala
+++ b/core/src/test/scala/dagr/core/cmdline/pipelines/Pipelines.scala
@@ -57,3 +57,9 @@ private[cmdline] case class PipelineFour
 @clp(description = "", group = classOf[TestGroup], hidden = true)
 private[cmdline] case class PipelineWithMutex
 (@arg(mutex = Array("another")) var argument: String, @arg(mutex = Array("argument")) var another: String) extends CommandLineTaskTesting // argument should be required
+
+@clp(description = "", group = classOf[TestGroup], hidden = true)
+private[cmdline] case class PipelineBuildFailure
+(@arg var argument: String = "default", @arg var flag: Boolean = false) extends CommandLineTaskTesting {
+  override def build(): Unit = throw new IllegalStateException()
+}
diff --git a/core/src/test/scala/dagr/core/execsystem/TaskManagerTest.scala b/core/src/test/scala/dagr/core/execsystem/TaskManagerTest.scala
index f8acab17..bae6c9c7 100644
--- a/core/src/test/scala/dagr/core/execsystem/TaskManagerTest.scala
+++ b/core/src/test/scala/dagr/core/execsystem/TaskManagerTest.scala
@@ -128,10 +128,39 @@ class TaskManagerTest extends UnitSpec with OptionValues with LazyLogging with B
     tryTaskNTimes(taskManager = taskManager, task = task, numTimes = 1, taskIsDoneFinally = true, failedAreCompletedFinally = true)
   }
 
+  it should "run an empty task" in {
+    val task = Task.empty
+    val taskManager: TestTaskManager = getDefaultTaskManager()
+    taskManager.addTask(task)
+    val taskMap = taskManager.runToCompletion(true)
+    taskMap.size shouldBe 1
+    val taskInfo = taskMap.valueFor(task).get
+    TaskStatus.isTaskDone(taskInfo.status, failedIsDone = false) shouldBe true
+  }
+
+  it should "run an empty task as part of a pipeline" in {
+    val pipeline = new Pipeline() {
+      name = "Pipeline"
+      override def build(): Unit = {
+        def newTask(name: String) = new ShellCommand("exit", "0") withName "exit 0" requires ResourceSet.empty withName name
+        val middle = Task.empty
+        root ==> newTask("exit0-1") ==> middle ==> newTask("exit0-2")
+      }
+    }
+
+    val taskManager: TestTaskManager = getDefaultTaskManager()
+    taskManager.addTask(pipeline)
+    val taskMap = taskManager.runToCompletion(true)
+    taskMap.size shouldBe 4
+    taskMap.foreach { case (_, info) =>
+      TaskStatus.isTaskDone(info.status, failedIsDone = false) shouldBe true
+    }
+  }
+
+
   it should "run a simple task that fails but we allow it" in {
     val task: UnitTask = new ShellCommand("exit", "1") withName "exit 1"
     val taskManager: TestTaskManager = getDefaultTaskManager()
-
     tryTaskNTimes(taskManager = taskManager, task = task, numTimes = 1, taskIsDoneFinally = true, failedAreCompletedFinally = true)
   }