From 9313a7e81b761dd28779ef57c9e0557fa58b6edb Mon Sep 17 00:00:00 2001 From: Nils Homer Date: Wed, 25 Mar 2020 13:07:57 -0700 Subject: [PATCH] Various Task Manage Speedups (#373) * Pipeline: do not add children of tasks already added * Pipeline: solve stack overflow in addChildren * Task: find cycles in a collection of tasks all at once. When a task returns a list of tasks via getTasks, we currently check for cyclical dependencies on each task independently. If they are all connected in the DAG, then this is really slow! This avoids that by finding the strongly connected components jointly across the new *to be added* tasks. * Small performance optimizations on dense dags * TaskManager: exponential reduction in sleep time if we can do anything, otherwise linear increase * No need to check for cycles before executing a task, as it was checked when added. * Warn if a single step in execution takes longer than 30 seconds * Optimize GraphNode * update based on review * ResourceSet bug when subsetting resources and some performance optimizations (#374) * ResourceSet bug fix If the minimum to subset to is fractional, with a different fractional value than the maximum, it could be missed. * Task Manager optimizations * A few NaiveScheduler simplifications --- .../dagr/core/execsystem/GraphNode.scala | 7 +- .../dagr/core/execsystem/NaiveScheduler.scala | 30 ++--- .../dagr/core/execsystem/ResourceSet.scala | 32 ++++- .../dagr/core/execsystem/TaskManager.scala | 95 +++++++++---- .../dagr/core/execsystem/TaskTracker.scala | 125 +++++++++--------- .../scala/dagr/core/tasksystem/Pipeline.scala | 19 ++- .../scala/dagr/core/tasksystem/Task.scala | 61 +++++---- .../core/execsystem/ResourceSetTest.scala | 29 +++- .../core/execsystem/TaskManagerTest.scala | 29 ++-- .../TopLikeStatusReporterTest.scala | 8 +- .../dagr/core/tasksystem/DependableTest.scala | 8 +- .../scala/dagr/tasks/ScatterGatherTests.scala | 2 +- 12 files changed, 274 insertions(+), 171 deletions(-) diff --git a/core/src/main/scala/dagr/core/execsystem/GraphNode.scala b/core/src/main/scala/dagr/core/execsystem/GraphNode.scala index 386a44f8..3fcdcd10 100644 --- a/core/src/main/scala/dagr/core/execsystem/GraphNode.scala +++ b/core/src/main/scala/dagr/core/execsystem/GraphNode.scala @@ -43,7 +43,7 @@ class GraphNode(var task: Task, var state: GraphNodeState.Value = GraphNodeState.PREDECESSORS_AND_UNEXPANDED, val enclosingNode: Option[GraphNode] = None) extends BaseGraphNode { - private val _predecessors = new ListBuffer[GraphNode]() + private val _predecessors = new scala.collection.mutable.LinkedHashSet[GraphNode]() _predecessors ++= predecessorNodes @@ -93,9 +93,12 @@ class GraphNode(var task: Task, addPredecessors(predecessor.toSeq:_*) } + /** Gets the number of predecessors */ + def numPredecessors: Int = _predecessors.size + /** Get the predecessors * * @return the current set of predecessors, if any */ - def predecessors: List[GraphNode] = _predecessors.toList + def predecessors: Iterable[GraphNode] = _predecessors } diff --git a/core/src/main/scala/dagr/core/execsystem/NaiveScheduler.scala b/core/src/main/scala/dagr/core/execsystem/NaiveScheduler.scala index 61099fbc..bc91efa2 100644 --- a/core/src/main/scala/dagr/core/execsystem/NaiveScheduler.scala +++ b/core/src/main/scala/dagr/core/execsystem/NaiveScheduler.scala @@ -36,22 +36,15 @@ class NaiveScheduler extends Scheduler { remainingSystemMemory: Memory, remainingJvmMemory: Memory): Option[(UnitTask, ResourceSet)] = { val systemResourceSet: ResourceSet = ResourceSet(remainingSystemCores, remainingSystemMemory) - val jvmResourceSet: ResourceSet = ResourceSet(remainingSystemCores, remainingJvmMemory) + val jvmResourceSet: ResourceSet = ResourceSet(remainingSystemCores, remainingJvmMemory) // Find the first task that can be executed readyTasks - .view // lazy - .map { // pick resources - case task: ProcessTask => (task, task.pickResources(systemResourceSet)) - case task: InJvmTask => (task, task.pickResources(jvmResourceSet)) - } - .find { // find the first that returned a resource set - case (_, Some(resourceSet)) => true - case _ => false - } - .map { // get the resource set - case (task, Some(resourceSet)) => (task, resourceSet) - case _ => throw new IllegalStateException("BUG") + .view + .flatMap { // pick resources + case task: ProcessTask => task.pickResources(systemResourceSet).map { rs => (task, rs) } + case task: InJvmTask => task.pickResources(jvmResourceSet).map { rs => (task, rs) } } + .headOption } /** Runs one round of scheduling, trying to schedule as many ready tasks as possible given the @@ -67,9 +60,9 @@ class NaiveScheduler extends Scheduler { private def scheduleOnce(readyTasks: Iterable[UnitTask], remainingSystemCores: Cores, remainingSystemMemory: Memory, - remainingJvmMemory: Memory): List[(UnitTask, ResourceSet)] = { + remainingJvmMemory: Memory): Seq[(UnitTask, ResourceSet)] = { // no more tasks ready to be scheduled - if (readyTasks.isEmpty) Nil + if (readyTasks.isEmpty) Seq.empty else { logger.debug(s"the resources were [System cores=" + remainingSystemCores.value + " System memory=" + Resource.parseBytesToSize(remainingSystemMemory.value) @@ -77,20 +70,19 @@ class NaiveScheduler extends Scheduler { // try one round of scheduling, and recurse if a task could be scheduled scheduleOneTask(readyTasks, remainingSystemCores, remainingSystemMemory, remainingJvmMemory) match { - case None => - Nil + case None => Seq.empty case Some((task: UnitTask, resourceSet: ResourceSet)) => logger.debug("task to schedule is [" + task.name + "]") logger.debug(s"task [${task.name}] uses the following resources [" + resourceSet + "]") List[(UnitTask, ResourceSet)]((task, resourceSet)) ++ (task match { - case processTask: ProcessTask => + case _: ProcessTask => scheduleOnce( readyTasks = readyTasks.filterNot(t => t == task), remainingSystemCores = remainingSystemCores - resourceSet.cores, remainingSystemMemory = remainingSystemMemory - resourceSet.memory, remainingJvmMemory = remainingJvmMemory ) - case inJvmTask: InJvmTask => + case _: InJvmTask => scheduleOnce( readyTasks = readyTasks.filterNot(t => t == task), remainingSystemCores = remainingSystemCores - resourceSet.cores, diff --git a/core/src/main/scala/dagr/core/execsystem/ResourceSet.scala b/core/src/main/scala/dagr/core/execsystem/ResourceSet.scala index b63f289e..45318873 100644 --- a/core/src/main/scala/dagr/core/execsystem/ResourceSet.scala +++ b/core/src/main/scala/dagr/core/execsystem/ResourceSet.scala @@ -56,13 +56,35 @@ case class ResourceSet(cores: Cores = Cores.none, memory: Memory = Memory.none) /** * Constructs a subset of this resource set with a fixed amount of memory and a variable - * number of cores. Will greedily assign the highest number of cores possible. + * number of cores. Will greedily assign the highest number of cores possible. If the maximum cores is a whole + * number, then a whole number will be returned, unless the minimum cores (which can be a fractional number of cores) + * is the only valid value. If the maximum cores is not a whole number, then the maximum fractional amount will be + * returned. + * + * Example 1: minCores=1, maxCores=5, this.cores=4.5, then 4 cores will be returned. + * Example 2: minCores=1, maxCores=5.1, this.cores=4.5, then 4.5 cores will be returned. + * Example 3: minCores=1, maxCores=5, this.cores=1.5, then 1 core will be returned. + * Example 4: minCores=1.5, maxCores=5, this.cores=1.5, then 1.5 cores will be returned. */ def subset(minCores: Cores, maxCores: Cores, memory: Memory) : Option[ResourceSet] = { - val min = minCores.value - val max = maxCores.value - val cores = Range.BigDecimal.inclusive(max, min, -1).find(cores => subset(Cores(cores.doubleValue), memory).isDefined) - cores.map(c => ResourceSet(Cores(c.doubleValue), memory)) + if (!subsettable(ResourceSet(minCores, memory))) None else { + val coresValue = { + // Try to return a whole number value if maxCores is a whole number. If no whole number exists that is greater + // than or equal to minCores, then just use minCores (which could be fractional). If maxCores is fractional, + // then return a fractional value. + if (maxCores.value.isValidInt) { + // Get the number of cores, but rounded down to get a whole number value + val minValue = Math.floor(Math.min(this.cores.value, maxCores.value)) + // If the number rounded down is smaller than the min-cores, then just return the min-cores + if (minValue < minCores.value) minCores.value else minValue + } else { // any fractional number will do + Math.min(this.cores.value, maxCores.value) + } + } + val resourceSet = ResourceSet(Cores(coresValue), memory) + require(subsettable(resourceSet)) + Some(resourceSet) + } } /** diff --git a/core/src/main/scala/dagr/core/execsystem/TaskManager.scala b/core/src/main/scala/dagr/core/execsystem/TaskManager.scala index d3046884..588346e2 100644 --- a/core/src/main/scala/dagr/core/execsystem/TaskManager.scala +++ b/core/src/main/scala/dagr/core/execsystem/TaskManager.scala @@ -24,13 +24,16 @@ package dagr.core.execsystem import java.nio.file.Path -import java.time.Instant +import java.time.{Duration, Instant} import dagr.core.DagrDef._ import com.fulcrumgenomics.commons.util.LazyLogging import dagr.core.tasksystem._ import com.fulcrumgenomics.commons.collection.BiMap import com.fulcrumgenomics.commons.io.{Io, PathUtil} +import dagr.core.execsystem + +import scala.annotation.tailrec /** The resources needed for the task manager */ object SystemResources { @@ -80,10 +83,22 @@ object TaskManagerDefaults extends LazyLogging { object TaskManager extends LazyLogging { import dagr.core.execsystem.TaskManagerDefaults._ + /** The initial time to wait between scheduling tasks. */ + val InitialSleepMillis: Int = 100 + /** The minimum time to wait between scheduling tasks. */ + val MinSleepMillis: Int = 10 + /** The maximum time to wait between scheduling tasks. */ + val MaxSleepMillis: Int = 1000 + /** The increased amount time to wait between scheduling tasks after nothing can be done (linear increase). */ + val StepSleepMillis: Int = 50 + /** The scaling factor to reduce (divide) the time by to wait between scheduling tasks (exponential backoff). */ + val BackoffSleepFactor: Float = 2f + /** The maximum time between two attempts to task scheduling attempts after which a warning is logged. */ + val SlowStepTimeSeconds: Int = 30 + /** Runs a given task to either completion, failure, or inability to schedule. This will terminate tasks that were still running before returning. * * @param task the task to run - * @param sleepMilliseconds the time to wait in milliseconds to wait between trying to schedule tasks. * @param taskManagerResources the set of task manager resources, otherwise we use the default * @param scriptsDirectory the scripts directory, otherwise we use the default * @param logDirectory the log directory, otherwise we use the default @@ -92,7 +107,6 @@ object TaskManager extends LazyLogging { * @return a bi-directional map from the set of tasks to their execution information. */ def run(task: Task, - sleepMilliseconds: Int = 1000, taskManagerResources: Option[SystemResources] = Some(defaultTaskManagerResources), scriptsDirectory: Option[Path] = None, logDirectory: Option[Path] = None, @@ -105,8 +119,7 @@ object TaskManager extends LazyLogging { scriptsDirectory = scriptsDirectory, logDirectory = logDirectory, scheduler = scheduler.getOrElse(defaultScheduler), - simulate = simulate, - sleepMilliseconds = sleepMilliseconds + simulate = simulate ) taskManager.addTask(task = task) @@ -114,6 +127,7 @@ object TaskManager extends LazyLogging { taskManager.taskToInfoBiMapFor } + } /** A manager of tasks. @@ -125,16 +139,17 @@ object TaskManager extends LazyLogging { * @param logDirectory the log directory, otherwise a temporary directory will be used * @param scheduler the scheduler, otherwise we use the default * @param simulate true if we are to simulate running tasks, false otherwise - * @param sleepMilliseconds the time to wait in milliseconds to wait between trying to schedule tasks. */ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.defaultTaskManagerResources, scriptsDirectory: Option[Path] = None, logDirectory: Option[Path] = None, scheduler: Scheduler = TaskManagerDefaults.defaultScheduler, - simulate: Boolean = false, - sleepMilliseconds: Int = 250 + simulate: Boolean = false + ) extends TaskManagerLike with TaskTracker with FinalStatusReporter with LazyLogging { + private var curSleepMilliseconds: Int = TaskManager.InitialSleepMillis + private val actualScriptsDirectory = scriptsDirectory getOrElse Io.makeTempDir("scripts") protected val actualLogsDirectory = logDirectory getOrElse Io.makeTempDir("logs") @@ -313,20 +328,21 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de */ private def updateCompletedTasks(): Map[TaskId, (Int, Boolean)] = { val completedTasks: Map[TaskId, (Int, Boolean)] = taskExecutionRunner.completedTasks() - val emptyTasks = graphNodesInStateFor(GraphNodeState.NO_PREDECESSORS).filter(_.task.isInstanceOf[Task.EmptyTask]).toSeq - val completedTaskIds = completedTasks.keys ++ emptyTasks.map(_.taskId) + val emptyTasks = graphNodesInStateFor(GraphNodeState.NO_PREDECESSORS).filter(_.task.isInstanceOf[Task.EmptyTask]) emptyTasks.foreach { node => node.taskInfo.status = TaskStatus.SUCCEEDED + processCompletedTask(node.taskId) logger.debug("updateCompletedTasks: empty task [" + node.task.name + "] completed") } - completedTaskIds.foreach { taskId => + completedTasks.keysIterator.foreach { taskId => processCompletedTask(taskId) val name = this(taskId).task.name val status = this(taskId).taskInfo.status logger.debug("updateCompletedTasks: task [" + name + "] completed with task status [" + status + "]") } + completedTasks } @@ -344,7 +360,7 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de logger.debug("updateOrphans: found an orphan task [" + node.task.name + "] that has [" + predecessorsOf(task=node.task).getOrElse(Nil).size + "] predecessors") node.addPredecessors(predecessorsOf(task=node.task).get) - logger.debug("updateOrphans: orphan task [" + node.task.name + "] now has [" + node.predecessors.size + "] predecessors") + logger.debug("updateOrphans: orphan task [" + node.task.name + "] now has [" + node.numPredecessors + "] predecessors") // update its state node.state = PREDECESSORS_AND_UNEXPANDED }) @@ -358,14 +374,15 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de * to have no predecessors: [[NO_PREDECESSORS]]. If we find the former case, we need to perform this procedure again, * since some tasks go strait to succeeded and we may have successor tasks (children) that can now execute. */ + @tailrec private def updatePredecessors(): Unit = { var hasMore = false - for (node <- graphNodesWithPredecessors) { - node.predecessors.filter(p => p.state == GraphNodeState.COMPLETED && TaskStatus.isTaskDone(p.taskInfo.status, failedIsDone=false)).map(p => node.removePredecessor(p)) - logger.debug("updatePredecessors: examining task [" + node.task.name + "] for predecessors: " + node.hasPredecessor) + graphNodesWithPredecessors.foreach { node => + node.predecessors.filter(p => p.state == GraphNodeState.COMPLETED && TaskStatus.isTaskDone(p.taskInfo.status, failedIsDone=false)).foreach(p => node.removePredecessor(p)) + //logger.debug("updatePredecessors: examining task [" + node.task.name + "] for predecessors: " + node.hasPredecessor) // - if this node has already been expanded and now has no predecessors, then move it to the next state. // - if it hasn't been expanded and now has no predecessors, it should get expanded later - if (!node.hasPredecessor) logger.debug(s"updatePredecessors: has node state: ${node.state}") + //if (!node.hasPredecessor) logger.debug(s"updatePredecessors: has node state: ${node.state}") if (!node.hasPredecessor && node.state == ONLY_PREDECESSORS) { val taskInfo = node.taskInfo node.task match { @@ -375,7 +392,7 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de taskInfo.status = TaskStatus.SUCCEEDED hasMore = true // try again for all successors, since we have more nodes that have completed } - logger.debug(s"updatePredecessors: task [${node.task.name}] now has node state [${node.state}] and status [${taskInfo.status}]") + //logger.debug(s"updatePredecessors: task [${node.task.name}] now has node state [${node.state}] and status [${taskInfo.status}]") } } @@ -386,7 +403,7 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de /** Invokes `getTasks` on the task associated with the graph node. * - * (1) In the case that `getTasks` returns the same exact task, check for cycles and verify it is a [[UnitTask]]. Since + * (1) In the case that `getTasks` returns the same exact task, it verifies it is a [[UnitTask]]. Since * the task already has an execution node, it must have already passed to [[addTask()]]. * * (2) In the case that `getTasks` returns a different task, or more than one task, set the submission date of the node, @@ -410,8 +427,10 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de false case x :: Nil if x == node.task => // one task and it returned itself logger.debug(f"invokeGetTasks 2 ${node.task.name} : ${tasks.map(_.name).mkString(", ")}") + // Developer note: removing the check for cycles here because we should have checked for cycles when the task + // was added! // check for cycles only when we have a unit task for which calling [[getTasks] returns itself. - checkForCycles(task = node.task) + // checkForCycles(task = node.task) // verify we have a UnitTask node.task match { case _: UnitTask => @@ -425,10 +444,10 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de // we will make this task dependent on the tasks it creates... if (tasks.contains(node.task)) throw new IllegalStateException(s"Task [${node.task.name}] contained itself in the list returned by getTasks") // track the new tasks. If they are already added, that's fine too. - val taskIds: Seq[TaskId] = tasks.map { task => addTask(task = task, enclosingNode = Some(node), ignoreExists = true) } + val taskIds: Seq[TaskId] = addTasks(tasks, enclosingNode = Some(node), ignoreExists = true) // make this node dependent on those tasks taskIds.map(taskId => node.addPredecessors(this(taskId))) - // we may need to update precedessors if a returned task was already completed + // we may need to update predecessors if a returned task was already completed if (tasks.flatMap(t => graphNodeFor(t)).exists(_.state == GraphNodeState.COMPLETED)) updatePredecessors() // TODO: we could check each new task to see if they are in the PREDECESSORS_AND_UNEXPANDED state true @@ -544,8 +563,8 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de val runningTasks: Map[UnitTask, ResourceSet] = runningTasksMap // get the tasks that are eligible for execution (tasks with no dependents) - val (emptyTasks: List[Task], readyTasks: List[Task]) = { - graphNodesInStateFor(NO_PREDECESSORS).map(_.task).toList.partition(_.isInstanceOf[Task.EmptyTask]) + val (emptyTasks: Seq[Task], readyTasks: Seq[Task]) = { + graphNodesInStateFor(NO_PREDECESSORS).map(_.task).toSeq.partition(_.isInstanceOf[Task.EmptyTask]) } logger.debug(s"stepExecution: found ${readyTasks.size} readyTasks tasks and ${emptyTasks.size} empty tasks") @@ -571,6 +590,14 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de logger.debug("stepExecution: finishing one round of execution") logger.debug("stepExecution: found " + runningTasks.size + " running tasks and " + tasksToSchedule.size + " tasks to schedule") + // Update the current sleep time: exponential reduction if we could do **anything**, otherwise linear increase. + if (canDoAnything) { + curSleepMilliseconds = Math.max(TaskManager.MinSleepMillis, (curSleepMilliseconds / TaskManager.BackoffSleepFactor).toInt) + } + else { + curSleepMilliseconds = Math.min(TaskManager.MaxSleepMillis, curSleepMilliseconds + TaskManager.StepSleepMillis) + } + ( readyTasks, tasksToSchedule.keys, @@ -582,12 +609,28 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de override def runToCompletion(failFast: Boolean): BiMap[Task, TaskExecutionInfo] = { var allDone = false while (!allDone) { + + // Step the execution once + val startTime = Instant.now() val (readyTasks, tasksToSchedule, runningTasks, _) = stepExecution() - Thread.sleep(sleepMilliseconds) + // Warn if the single step in execution "took a long time" + val stepExecutionDuration = Duration.between(startTime, Instant.now()).getSeconds + if (stepExecutionDuration > TaskManager.SlowStepTimeSeconds) { + logger.warning("*" * 80) + logger.warning(s"A single step in execution was > ${TaskManager.SlowStepTimeSeconds}s (${stepExecutionDuration}s).") + val infosByStatus: Map[execsystem.TaskStatus.Value, Iterable[TaskExecutionInfo]] = this.taskToInfoBiMapFor.values.groupBy(_.status) + TaskStatus.values.filter(infosByStatus.contains).foreach { status => + logger.warning(s"Found ${infosByStatus(status).size} tasks with status: $status") + } + logger.warning("*" * 80) + } + + logger.debug(s"Sleeping ${curSleepMilliseconds}ms") + if (curSleepMilliseconds > 0) Thread.sleep(curSleepMilliseconds) - // check if we have only completed or orphan all tasks - allDone = graphNodesInStatesFor(List(ORPHAN, COMPLETED)).size == graphNodes.size + // check if we have only completed or orphan tasks + allDone = allGraphNodesInStates(Set(ORPHAN, COMPLETED)) if (!allDone && runningTasks.isEmpty && tasksToSchedule.isEmpty) { if (readyTasks.nonEmpty) { diff --git a/core/src/main/scala/dagr/core/execsystem/TaskTracker.scala b/core/src/main/scala/dagr/core/execsystem/TaskTracker.scala index b6fa80aa..8d615221 100644 --- a/core/src/main/scala/dagr/core/execsystem/TaskTracker.scala +++ b/core/src/main/scala/dagr/core/execsystem/TaskTracker.scala @@ -28,7 +28,7 @@ import java.time.Instant import com.fulcrumgenomics.commons.CommonsDef._ import com.fulcrumgenomics.commons.collection.BiMap -import com.fulcrumgenomics.commons.util.LazyLogging +import com.fulcrumgenomics.commons.util.{LazyLogging, SimpleCounter} import dagr.core.DagrDef._ import dagr.core.execsystem.TaskStatus._ import dagr.core.tasksystem.Task @@ -66,73 +66,67 @@ trait TaskTracker extends TaskManagerLike with LazyLogging { private val idToTask: mutable.Map[TaskId, Task] = mutable.Map[TaskId, Task]() private val idToNode: mutable.Map[TaskId, GraphNode] = mutable.Map[TaskId, GraphNode]() - override def addTask(task: Task): TaskId = { - addTask(task=task, enclosingNode=None, ignoreExists=false) - } - - /** Adds a task to be managed - * - * Throws an [[IllegalArgumentException]] if a cycle was found after logging each strongly connected component with - * a cycle in the graph. - * - * @param ignoreExists true if we just return the task id for already added tasks, false if we are to throw an [[IllegalArgumentException]] - * @param task the given task. - * @return the task identifier. + /** Adds a task to be managed, but does not check for cycles, or that the next identifier is currently used. This + * should be performed by the caller. */ - protected[execsystem] def addTask(task: Task, enclosingNode: Option[GraphNode], ignoreExists: Boolean = false): TaskId = { - // Make sure the id we will assign the task are not being tracked. - if (idToTask.contains(nextId)) throw new IllegalArgumentException(s"Task '${task.name}' with id '$nextId' was already added!") - if (idToNode.contains(nextId)) throw new IllegalArgumentException(s"Task '${task.name}' with id '$nextId' was already added!") - - taskFor(task) match { - case Some(id) if ignoreExists => id - case Some(id) => throw new IllegalArgumentException(s"Task '${task.name}' with id '$id' was already added!") - case None => - // check for cycles - checkForCycles(task = task) - - // set the task id - val id = yieldAndThen(nextId) {nextId += 1} - // set the task info - require(task._taskInfo.isEmpty) // should not have any info! - val info = new TaskExecutionInfo( - task=task, - taskId=id, - status=UNKNOWN, - script=scriptPathFor(task=task, id=id, attemptIndex=1), - logFile=logPathFor(task=task, id=id, attemptIndex=1), - submissionDate=Some(Instant.now()) - ) - task._taskInfo = Some(info) - - // create the graph node - val node = predecessorsOf(task=task) match { - case None => new GraphNode(task=task, predecessorNodes=Nil, state=GraphNodeState.ORPHAN, enclosingNode=enclosingNode) - case Some(predecessors) => new GraphNode(task=task, predecessorNodes=predecessors, enclosingNode=enclosingNode) - } + private def addTaskNoChecking(task: Task, enclosingNode: Option[GraphNode] = None): TaskId = { + // set the task id + val id = yieldAndThen(nextId) {nextId += 1} + // set the task info + require(task._taskInfo.isEmpty) // should not have any info! + val info = new TaskExecutionInfo( + task = task, + taskId = id, + status = UNKNOWN, + script = scriptPathFor(task=task, id=id, attemptIndex=1), + logFile = logPathFor(task=task, id=id, attemptIndex=1), + submissionDate = Some(Instant.now()) + ) + task._taskInfo = Some(info) + + // create the graph node + val node = predecessorsOf(task=task) match { + case None => new GraphNode(task=task, predecessorNodes=Nil, state=GraphNodeState.ORPHAN, enclosingNode=enclosingNode) + case Some(predecessors) => new GraphNode(task=task, predecessorNodes=predecessors, enclosingNode=enclosingNode) + } - // update the lookups - idToTask.put(id, task) - idToNode.put(id, node) + // update the lookups + idToTask.put(id, task) + idToNode.put(id, node) - id - } + id } - /** Adds tasks to be managed + /** Adds tasks to be managed. * * @param tasks the given tasks. + * @param enclosingNode the graph node of the parent task that generated this task (if any) * @param ignoreExists true if we just return the task id for already added tasks, false if we are to throw an [[IllegalArgumentException]] * @return the task identifiers. */ - protected[execsystem] def addTasks(tasks: Iterable[Task], enclosingNode: Option[GraphNode] = None, ignoreExists: Boolean = false): List[TaskId] = { - tasks.map(task => addTask(task=task, enclosingNode=enclosingNode, ignoreExists=ignoreExists)).toList - } + protected[execsystem] def addTasks(tasks: Seq[Task], enclosingNode: Option[GraphNode] = None, ignoreExists: Boolean = false): Seq[TaskId] = { + // Make sure the id we will assign the task are not being tracked. + if (idToTask.contains(nextId)) throw new IllegalArgumentException(s"Task id '$nextId' was already added!") + + val tasksToAdd = tasks.flatMap { task => + taskFor(task) match { + case Some(_) if ignoreExists => None + case Some(id) => throw new IllegalArgumentException(s"Task '${task.name}' with id '$id' was already added!") + case None => Some(task) + } + } + + checkForCycles(tasksToAdd:_*) - override def addTasks(tasks: Task*): Seq[TaskId] = { - tasks.map(task => addTask(task, enclosingNode=None, ignoreExists=false)) + tasks.map { task => taskFor(task).getOrElse(addTaskNoChecking(task, enclosingNode)) } } + /** Adds a task to be managed. */ + override def addTask(task: Task): TaskId = addTasks(task).head + + /** Adds one or more tasks to be managed. */ + override def addTasks(tasks: Task*): Seq[TaskId] = this.addTasks(tasks, enclosingNode=None, ignoreExists=false) + override def taskFor(id: TaskId): Option[Task] = idToTask.get(id) override def taskExecutionInfoFor(id: TaskId): Option[TaskExecutionInfo] = idToNode.get(id).map(_.taskInfo) @@ -182,6 +176,8 @@ trait TaskTracker extends TaskManagerLike with LazyLogging { */ def graphNodes: Iterable[GraphNode] = idToNode.values + def numGraphNodes: Int = idToNode.size + override def apply(id: TaskId): GraphNode = { this.graphNodeFor(id=id) match { case Some(node) => node @@ -191,7 +187,7 @@ trait TaskTracker extends TaskManagerLike with LazyLogging { override def taskToInfoBiMapFor: BiMap[Task, TaskExecutionInfo] = { val map: BiMap[Task, TaskExecutionInfo] = new BiMap[Task, TaskExecutionInfo]() - idToTask.foreach { case (id, task) => map.add(task, task.taskInfo) } + idToTask.foreach { case (_, task) => map.add(task, task.taskInfo) } map } @@ -259,24 +255,22 @@ trait TaskTracker extends TaskManagerLike with LazyLogging { * * @param task a task in the graph to check. */ - protected def checkForCycles(task: Task): Unit = { + protected def checkForCycles(task: Task*): Unit = { // check for cycles - if (Task.hasCycle(task)) { + if (Task.hasCycle(task:_*)) { logger.error("Task was part of a graph that had a cycle") - for (component <- Task.findStronglyConnectedComponents(task = task)) { + for (component <- Task.findStronglyConnectedComponents(task = task:_*)) { if (Task.isComponentACycle(component = component)) { logger.error("Tasks were part of a strongly connected component with a cycle: " + component.map(t => s"[${t.name}]").mkString(", ")) } } - throw new IllegalArgumentException(s"Task was part of a graph that had a cycle [${task.name}]") + throw new IllegalArgumentException(s"Task(s) had cyclical dependencies [${task.map(_.name).mkString(",")}]") } } /** Returns true if all the predecessors of this task are known (have been added), false otherwise. */ - protected def allPredecessorsAdded(task: Task): Boolean = { - predecessorsOf(task = task).nonEmpty - } + protected def allPredecessorsAdded(task: Task): Boolean = predecessorsOf(task = task).nonEmpty /** Gets the predecessor graph nodes of the given task in the execution graph. * @@ -321,4 +315,9 @@ trait TaskTracker extends TaskManagerLike with LazyLogging { protected def graphNodesWithPredecessors: Iterable[GraphNode] = { graphNodes.filter(node => GraphNodeState.hasPredecessors(node.state)) } + + protected def allGraphNodesInStates(states: Set[GraphNodeState.Value]): Boolean = { + graphNodes.forall(n => states.contains(n.state)) + } + } diff --git a/core/src/main/scala/dagr/core/tasksystem/Pipeline.scala b/core/src/main/scala/dagr/core/tasksystem/Pipeline.scala index 910616a8..d2b76adf 100644 --- a/core/src/main/scala/dagr/core/tasksystem/Pipeline.scala +++ b/core/src/main/scala/dagr/core/tasksystem/Pipeline.scala @@ -28,6 +28,8 @@ import java.nio.file.Path import com.fulcrumgenomics.commons.io.Io import com.fulcrumgenomics.commons.util.LazyLogging +import scala.collection.mutable + /** Simple trait to track tasks within a pipeline */ abstract class Pipeline(val outputDirectory: Option[Path] = None, private var prefix: Option[String] = None, @@ -78,8 +80,21 @@ abstract class Pipeline(val outputDirectory: Option[Path] = None, /** Recursively navigates dependencies, starting from the supplied task, and add all children to this.tasks. */ private def addChildren(task : Task) : Unit = { - tasks ++= task.tasksDependingOnThisTask - task.tasksDependingOnThisTask.foreach(addChildren) + // Developer note: we may have very deep dependency graphs, so this implementation avoids stack overflows + // Developer note: we use a Set here so that we do not recurse on the same task twice. + // Suppose we have `A ==> (B :: C)` and `B ==> C`. Even thought this could be simplified to `A ==> B ==> C`, that's + // up to the caller, and we post-processing of the DAG. So when `addChildren` gets called on `A`, it recurses on + // `B` and `C`. Since `C` depends on `C`, without the uniqueness check we recurse on `C` in the `addChildren` + // call on `B`. + val toVisit: mutable.Set[Task] = mutable.HashSet[Task](task) + while (toVisit.nonEmpty) { + val nextTask: Task = toVisit.head + toVisit -= nextTask + nextTask.tasksDependingOnThisTask.filterNot(tasks.contains).foreach { child => + tasks += child + toVisit += child + } + } } /** True if we this pipeline is tracking this direct ancestor task, false otherwise. */ diff --git a/core/src/main/scala/dagr/core/tasksystem/Task.scala b/core/src/main/scala/dagr/core/tasksystem/Task.scala index 4f66f29a..e81f0892 100644 --- a/core/src/main/scala/dagr/core/tasksystem/Task.scala +++ b/core/src/main/scala/dagr/core/tasksystem/Task.scala @@ -26,6 +26,7 @@ package dagr.core.tasksystem import com.fulcrumgenomics.commons.CommonsDef.unreachable import dagr.core.execsystem.TaskExecutionInfo +import scala.annotation.tailrec import scala.collection.mutable import scala.collection.mutable.ListBuffer import scala.util.control.Breaks._ @@ -57,39 +58,40 @@ object Task { * @param task the task to begin search. * @return true if the DAG to which this task belongs has a cycle, false otherwise. */ - private[core] def hasCycle(task: Task): Boolean = { - findStronglyConnectedComponents(task).exists(component => isComponentACycle(component)) + private[core] def hasCycle(task: Task*): Boolean = { + findStronglyConnectedComponents(task:_*).exists(component => isComponentACycle(component)) } /** Finds all the strongly connected components of the graph to which this task is connected. * * Uses Tarjan's algorithm: https://en.wikipedia.org/wiki/Tarjan%27s_strongly_connected_components_algorithm * - * @param task a task in the graph to check. + * @param task one or more tasks in the graph to check. * @return the set of strongly connected components. */ - private[core] def findStronglyConnectedComponents(task: Task): Set[Set[Task]] = { + private[core] def findStronglyConnectedComponents(task: Task*): Iterator[Set[Task]] = { // 1. find all tasks connected to this task val visited: mutable.Set[Task] = new mutable.HashSet[Task]() - val toVisit: mutable.Set[Task] = mutable.HashSet[Task](task) + val toVisit: mutable.Set[Task] = mutable.HashSet[Task](task:_*) while (toVisit.nonEmpty) { val nextTask: Task = toVisit.head toVisit -= nextTask - (nextTask.tasksDependedOn.toList ::: nextTask.tasksDependingOnThisTask.toList).foreach(t => if (!visited.contains(t)) toVisit += t) + nextTask.tasksDependedOn.foreach(t => if (!visited.contains(t)) toVisit += t) + nextTask.tasksDependingOnThisTask.foreach(t => if (!visited.contains(t)) toVisit += t) visited += nextTask } // 2. Runs Tarjan's strongly connected components algorithm val data: TarjanData = new TarjanData - visited.filterNot(data.indexes.contains).foreach(v => findStronglyConnectedComponent(v, data)) + visited.iterator.filterNot(data.indexes.contains).foreach(v => findStronglyConnectedComponent(v, data)) // return all the components - data.components.map(component => component.toSet).toSet + data.components.iterator.filter(_.nonEmpty).map(component => component.toSet) } - /** Indicates if a given set of tasks that are strongly connected components contains a cycle. This is the + /** Indicates if a given set of tasks contain a cycle that is a strongly connected component. This is the * case if the set size is greater than one, or the task is depends on itself. See [[Task.findStronglyConnectedComponents()]] * for how to retrieve strongly connected components from a task. * @@ -101,8 +103,8 @@ object Task { else { component.headOption match { case Some(task) => - task.tasksDependedOn.toSet.contains(task) || - task.tasksDependingOnThisTask.toSet.contains(task) + task.tasksDependedOn.iterator.contains(task) || + task.tasksDependingOnThisTask.iterator.contains(task) case _ => false } } @@ -118,30 +120,35 @@ object Task { data.onStack += v // Consider successors of v - for(w <- v.tasksDependedOn) { // could alternatively use task.getTasksDependingOnThisTask + for (w <- v.tasksDependedOn) { // could alternatively use task.getTasksDependingOnThisTask if (!data.indexes.contains(w)) { // Successor w has not yet been visited; recurse on it findStronglyConnectedComponent(w, data) - data.lowLink.put(v, math.min(data.lowLink.get(v).get, data.lowLink.get(w).get)) + data.lowLink.put(v, math.min(data.lowLink(v), data.lowLink(w))) } else if (data.onStack(w)) { // Successor w is in stack S and hence in the current SCC - data.lowLink.put(v, math.min(data.lowLink.get(v).get, data.lowLink.get(w).get)) + data.lowLink.put(v, math.min(data.lowLink(v), data.lowLink(w))) } } // If v is a root node, pop the stack and generate an SCC - if (data.indexes.get(v).get == data.lowLink.get(v).get) { + if (data.indexes(v) == data.lowLink(v)) { val component: mutable.Set[Task] = new mutable.HashSet[Task]() - breakable { - while (data.stack.nonEmpty) { - val w: Task = data.stack.pop() - data.onStack -= w - component += w - if (w == v) break - } - } - data.components += component + buildComponent(data, v, component) + if (component.nonEmpty) data.components += component + } + } + + @tailrec + private def buildComponent(data: TarjanData, v: Task, component: mutable.Set[Task]): Unit = { + if (data.onStack.isEmpty) () + else { + val w: Task = data.stack.pop() + data.onStack -= w + component += w + if (w == v) () + else buildComponent(data, v, component) } } } @@ -190,9 +197,9 @@ trait Task extends Dependable { /** Removes this as a dependency for other */ override def !=>(other: Dependable): Unit = other.headTasks.foreach(_.removeDependency(this)) - override def headTasks: Iterable[Task] = Seq(this) - override def tailTasks: Iterable[Task] = Seq(this) - override def allTasks: Iterable[Task] = Seq(this) + override def headTasks: Iterable[Task] = Some(this) + override def tailTasks: Iterable[Task] = Some(this) + override def allTasks: Iterable[Task] = Some(this) /** * Removes a dependency by removing the supplied task from the list of dependencies for this task diff --git a/core/src/test/scala/dagr/core/execsystem/ResourceSetTest.scala b/core/src/test/scala/dagr/core/execsystem/ResourceSetTest.scala index 7ffe19d6..0bb42018 100644 --- a/core/src/test/scala/dagr/core/execsystem/ResourceSetTest.scala +++ b/core/src/test/scala/dagr/core/execsystem/ResourceSetTest.scala @@ -25,8 +25,9 @@ package dagr.core.execsystem import dagr.core.UnitSpec +import org.scalatest.OptionValues -class ResourceSetTest extends UnitSpec { +class ResourceSetTest extends UnitSpec with OptionValues { "ResourceSet.isEmpty" should "return true for the empty resource set" in { ResourceSet.empty.isEmpty shouldBe true } @@ -43,4 +44,30 @@ class ResourceSetTest extends UnitSpec { running = running - Cores(10) running.cores.value shouldBe 0 } + + it should "subset resources" in { + // doc examples + ResourceSet(4.5, 10).subset(Cores(1), Cores(5), Memory(10)).value shouldBe ResourceSet(4, 10) + ResourceSet(4.5, 10).subset(Cores(1), Cores(5.1), Memory(10)).value shouldBe ResourceSet(4.5, 10) + ResourceSet(1.5, 10).subset(Cores(1), Cores(5), Memory(10)).value shouldBe ResourceSet(1, 10) + ResourceSet(1.5, 10).subset(Cores(1.5), Cores(5), Memory(10)).value shouldBe ResourceSet(1.5, 10) + + val resources = ResourceSet(10, 10) + resources.subset(ResourceSet(10, 10)).value shouldBe ResourceSet(10, 10) + resources.subset(ResourceSet(10.5, 10)).isDefined shouldBe false + resources.subset(ResourceSet(9.5, 10)).value shouldBe ResourceSet(9.5, 10) + resources.subset(ResourceSet(5, 5)).value shouldBe ResourceSet(5, 5) + resources.subset(ResourceSet(4.5, 5)).value shouldBe ResourceSet(4.5, 5) + + val halfACore = ResourceSet(0.5, 10) + halfACore.subset(ResourceSet(0.5, 10)).value shouldBe ResourceSet(0.5, 10) + halfACore.subset(Cores(0.25), Cores(0.5), Memory(10)).value shouldBe ResourceSet(0.5, 10) + halfACore.subset(Cores(0.5), Cores(1), Memory(10)).value shouldBe ResourceSet(0.5, 10) + halfACore.subset(Cores(0.1), Cores(1), Memory(10)).value shouldBe ResourceSet(0.1, 10) + halfACore.subset(Cores(0.51), Cores(1), Memory(10)).isDefined shouldBe false + halfACore.subset(Cores(0.25), Cores(0.3), Memory(10)).value shouldBe ResourceSet(0.3, 10) + + val fiveAndAHalfCores = ResourceSet(5.5, 10) + fiveAndAHalfCores.subset(Cores(1.2), Cores(5.8), Memory(10)).value shouldBe ResourceSet(5.5, 10) + } } diff --git a/core/src/test/scala/dagr/core/execsystem/TaskManagerTest.scala b/core/src/test/scala/dagr/core/execsystem/TaskManagerTest.scala index bae6c9c7..9b4c5419 100644 --- a/core/src/test/scala/dagr/core/execsystem/TaskManagerTest.scala +++ b/core/src/test/scala/dagr/core/execsystem/TaskManagerTest.scala @@ -56,10 +56,9 @@ class TaskManagerTest extends UnitSpec with OptionValues with LazyLogging with B override def beforeAll(): Unit = Logger.level = LogLevel.Fatal override def afterAll(): Unit = Logger.level = LogLevel.Info - def getDefaultTaskManager(sleepMilliseconds: Int = 10): TestTaskManager = new TaskManager( + def getDefaultTaskManager(): TestTaskManager = new TaskManager( taskManagerResources = SystemResources.infinite, - scriptsDirectory = None, - sleepMilliseconds = sleepMilliseconds + scriptsDirectory = None ) with TestTaskManager private def runSchedulerOnce(taskManager: TestTaskManager, @@ -97,8 +96,8 @@ class TaskManagerTest extends UnitSpec with OptionValues with LazyLogging with B "TaskManager" should "not overwrite an existing task when adding a task, or throw an IllegalArgumentException when ignoreExists is false" in { val task: UnitTask = new ShellCommand("exit", "0") withName "exit 0" requires ResourceSet.empty val taskManager: TestTaskManager = getDefaultTaskManager() - taskManager.addTasks(tasks=Seq(task, task), ignoreExists=true) shouldBe List(0, 0) - an[IllegalArgumentException] should be thrownBy taskManager.addTask(task=task, enclosingNode=None, ignoreExists=false) + taskManager.addTasks(tasks=Seq(task, task), enclosingNode=None, ignoreExists=true) shouldBe List(0, 0) + an[IllegalArgumentException] should be thrownBy taskManager.addTasks(tasks=Seq(task), enclosingNode=None, ignoreExists=false) } it should "get the task status for only tracked tasks" in { @@ -190,7 +189,6 @@ class TaskManagerTest extends UnitSpec with OptionValues with LazyLogging with B def runSimpleEndToEnd(task: UnitTask = new ShellCommand("exit", "0") withName "exit 0", simulate: Boolean): Unit = { val map: BiMap[Task, TaskExecutionInfo] = TaskManager.run( task = task, - sleepMilliseconds = 10, taskManagerResources = Some(SystemResources.infinite), scriptsDirectory = None, simulate = simulate, @@ -222,7 +220,7 @@ class TaskManagerTest extends UnitSpec with OptionValues with LazyLogging with B val longTask: UnitTask = new ShellCommand("sleep", "1000") withName "sleep 1000" val failedTask: UnitTask = new ShellCommand("exit", "1") withName "exit 1" - val taskManager: TestTaskManager = getDefaultTaskManager(sleepMilliseconds=1) + val taskManager: TestTaskManager = getDefaultTaskManager() taskManager.addTasks(longTask, failedTask) taskManager.runToCompletion(failFast=true) taskManager.taskStatusFor(failedTask).value should be(TaskStatus.FAILED_COMMAND) @@ -233,7 +231,7 @@ class TaskManagerTest extends UnitSpec with OptionValues with LazyLogging with B it should "not schedule and run tasks that have failed dependencies" in { val List(a, b, c) = List(0,1,0).map(c => new ShellCommand("exit", c.toString)) a ==> b ==> c - val tm = getDefaultTaskManager(sleepMilliseconds=1) + val tm = getDefaultTaskManager() tm.addTasks(a, b, c) tm.runToCompletion(failFast=false) tm.taskStatusFor(a).value shouldBe TaskStatus.SUCCEEDED @@ -247,7 +245,7 @@ class TaskManagerTest extends UnitSpec with OptionValues with LazyLogging with B it should "not schedule and run tasks that have failed dependencies and complete all when failed tasks are manually succeeded" in { val List(a, b, c) = List(0,1,0).map(c => new ShellCommand("exit", c.toString)) a ==> b ==> c - val tm = getDefaultTaskManager(sleepMilliseconds=1) + val tm = getDefaultTaskManager() tm.addTasks(a, b, c) tm.runToCompletion(failFast=false) tm.taskStatusFor(a).value shouldBe TaskStatus.SUCCEEDED @@ -882,7 +880,6 @@ class TaskManagerTest extends UnitSpec with OptionValues with LazyLogging with B TaskManager.run( new HungryPipeline, - sleepMilliseconds = 1, taskManagerResources = Some(SystemResources(systemCores, Resource.parseSizeToBytes("8g").toLong, 0.toLong)), failFast=true ) @@ -915,7 +912,7 @@ class TaskManagerTest extends UnitSpec with OptionValues with LazyLogging with B } // add the tasks to the task manager - val taskManager: TestTaskManager = getDefaultTaskManager(sleepMilliseconds = 1) + val taskManager: TestTaskManager = getDefaultTaskManager() taskManager.addTasks(tasks) // run the tasks @@ -948,7 +945,7 @@ class TaskManagerTest extends UnitSpec with OptionValues with LazyLogging with B } // add the tasks to the task manager - val taskManager: TestTaskManager = getDefaultTaskManager(sleepMilliseconds = 1) + val taskManager: TestTaskManager = getDefaultTaskManager() taskManager.addTasks(pipeline) // run the tasks @@ -987,7 +984,7 @@ class TaskManagerTest extends UnitSpec with OptionValues with LazyLogging with B // NB: the execution is really: root ==> firstTask ==> secondTask // add the tasks to the task manager - val taskManager: TestTaskManager = getDefaultTaskManager(sleepMilliseconds = 1) + val taskManager: TestTaskManager = getDefaultTaskManager() taskManager.addTasks(outerPipeline) // run the tasks @@ -1022,7 +1019,7 @@ class TaskManagerTest extends UnitSpec with OptionValues with LazyLogging with B it should "mark a task as failed when one of its children fails" in { val parent = new ParentFailTask() - val taskManager: TestTaskManager = getDefaultTaskManager(sleepMilliseconds = 1) + val taskManager: TestTaskManager = getDefaultTaskManager() taskManager.addTask(parent) taskManager.runToCompletion(failFast=true) Seq(parent.child, parent).foreach { task => @@ -1037,7 +1034,7 @@ class TaskManagerTest extends UnitSpec with OptionValues with LazyLogging with B it should "mark a pipeline as failed when one of its children fails" in { val pipeline = new FailPipeline() - val taskManager: TestTaskManager = getDefaultTaskManager(sleepMilliseconds = 1) + val taskManager: TestTaskManager = getDefaultTaskManager() taskManager.addTask(pipeline) taskManager.runToCompletion(failFast=true) Seq(pipeline.child, pipeline).foreach { task => @@ -1055,7 +1052,7 @@ class TaskManagerTest extends UnitSpec with OptionValues with LazyLogging with B tasks += pipeline pipeline } - val taskManager: TestTaskManager = getDefaultTaskManager(sleepMilliseconds = 1) + val taskManager: TestTaskManager = getDefaultTaskManager() taskManager.addTask(root) taskManager.runToCompletion(failFast=true) tasks.foreach { task => diff --git a/core/src/test/scala/dagr/core/execsystem/TopLikeStatusReporterTest.scala b/core/src/test/scala/dagr/core/execsystem/TopLikeStatusReporterTest.scala index 87564e80..5f7fd61e 100644 --- a/core/src/test/scala/dagr/core/execsystem/TopLikeStatusReporterTest.scala +++ b/core/src/test/scala/dagr/core/execsystem/TopLikeStatusReporterTest.scala @@ -57,10 +57,9 @@ class TopLikeStatusReporterTest extends UnitSpec with CaptureSystemStreams with } } - private def getDefaultTaskManager(sleepMilliseconds: Int = 10): TaskManager = new TaskManager( + private def getDefaultTaskManager(): TaskManager = new TaskManager( taskManagerResources = SystemResources.infinite, - scriptsDirectory = None, - sleepMilliseconds = sleepMilliseconds + scriptsDirectory = None ) "Terminal" should "support ANSI codes" in { @@ -156,8 +155,7 @@ class TopLikeStatusReporterTest extends UnitSpec with CaptureSystemStreams with val printMethod: String => Unit = (str: String) => output.append(str) val taskManager = new TaskManager( taskManagerResources = SystemResources(1.0, Long.MaxValue, Long.MaxValue), // one task at a time - scriptsDirectory = None, - sleepMilliseconds = 10 + scriptsDirectory = None ) val reporter = new TopLikeStatusReporter(taskManager = taskManager, print = printMethod) with TestTerminal diff --git a/core/src/test/scala/dagr/core/tasksystem/DependableTest.scala b/core/src/test/scala/dagr/core/tasksystem/DependableTest.scala index 69833e91..4988126b 100644 --- a/core/src/test/scala/dagr/core/tasksystem/DependableTest.scala +++ b/core/src/test/scala/dagr/core/tasksystem/DependableTest.scala @@ -125,13 +125,13 @@ class DependableTest extends UnitSpec { k.tasksDependedOn should contain theSameElementsAs Seq(i) } - "Pipeline.root" should "return the same things as Pipeline from the *tasks methods" in { + "Pipeline.root" should "return the same things as Pipeline from the *tasks methods*" in { val pipeline = new Pipeline() { override def build() = root ==> (A :: B :: C) ==> (X :: Y :: Z) } - pipeline.root.headTasks shouldBe pipeline.headTasks - pipeline.root.tailTasks shouldBe pipeline.tailTasks - pipeline.root.allTasks shouldBe pipeline.allTasks + pipeline.root.headTasks.toList should contain theSameElementsInOrderAs pipeline.headTasks + pipeline.root.tailTasks.toList should contain theSameElementsInOrderAs pipeline.tailTasks + pipeline.root.allTasks.toList should contain theSameElementsInOrderAs pipeline.allTasks } } diff --git a/tasks/src/test/scala/dagr/tasks/ScatterGatherTests.scala b/tasks/src/test/scala/dagr/tasks/ScatterGatherTests.scala index a74bceb9..c83798bf 100644 --- a/tasks/src/test/scala/dagr/tasks/ScatterGatherTests.scala +++ b/tasks/src/test/scala/dagr/tasks/ScatterGatherTests.scala @@ -46,7 +46,7 @@ import org.scalatest.BeforeAndAfterAll class ScatterGatherTests extends UnitSpec with LazyLogging with BeforeAndAfterAll { override def beforeAll(): Unit = Logger.level = LogLevel.Fatal override def afterAll(): Unit = Logger.level = LogLevel.Info - def buildTaskManager: TaskManager = new TaskManager(taskManagerResources = SystemResources.infinite, scriptsDirectory = None, sleepMilliseconds=1) + def buildTaskManager: TaskManager = new TaskManager(taskManagerResources = SystemResources.infinite, scriptsDirectory = None) def tmp(prefix: Option[String] = None): Path = { val path = Files.createTempFile(prefix.getOrElse("testScatterGather."), ".txt")