From db43946d132f1c863f5d0ee83a92a49a80fb49cc Mon Sep 17 00:00:00 2001 From: Nils Homer Date: Sat, 10 Jun 2017 11:32:08 -0700 Subject: [PATCH] Major refactor again. Starting support for replay in the main task system (shouldn't be too much more effort) Test should be failing at the moment! --- build.sbt | 1 + .../dagr/core/cmdline/DagrCoreMain.scala | 94 +++++-------- .../core/{execsystem2 => exec}/ExecDef.scala | 6 +- .../main/scala/dagr/core/exec/Executor.scala | 57 ++++++++ .../replay => exec}/TaskCache.scala | 12 +- .../dagr/core/execsystem/TaskManager.scala | 27 +++- .../core/execsystem/TaskManagerLike.scala | 2 +- .../dagr/core/execsystem/TaskStatus.scala | 33 +++-- ...rter.scala => TopLikeStatusReporter.scala} | 15 ++- .../core/execsystem2/DependencyGraph.scala | 1 + .../dagr/core/execsystem2/GraphExecutor.scala | 47 +------ .../dagr/core/execsystem2/TaskInfo.scala | 1 + .../dagr/core/execsystem2/TaskStatus.scala | 54 +++++--- .../execsystem2/TopLikeStatusReporter.scala | 20 +-- .../execsystem2/local/LocalTaskExecutor.scala | 4 +- .../dagr/core/reporting/ExecutionLogger.scala | 2 - .../reporting/TopLikeStatusReporter.scala | 126 +++++++++--------- .../scala/dagr/core/tasksystem/Task.scala | 23 +++- .../core/execsystem/TaskManagerTest.scala | 33 ++--- .../execsystem2/GraphExecutorImplTest.scala | 1 + .../core/execsystem2/GraphExecutorTest.scala | 1 + .../execsystem2/GraphExecutorUnitSpec.scala | 1 + .../dagr/core/execsystem2/TaskInfoTest.scala | 1 + ....scala => TopLikeStatusReporterTest.scala} | 12 +- .../local/LocalTaskExecutorTest.scala | 1 + .../local/LocalTaskRunnerTest.scala | 3 +- .../reporting/TopLikeStatusReporterTest.scala | 88 +++++++----- .../scala/dagr/tasks/ScatterGatherTests.scala | 6 +- 28 files changed, 372 insertions(+), 300 deletions(-) rename core/src/main/scala/dagr/core/{execsystem2 => exec}/ExecDef.scala (93%) create mode 100644 core/src/main/scala/dagr/core/exec/Executor.scala rename core/src/main/scala/dagr/core/{execsystem2/replay => exec}/TaskCache.scala (95%) rename core/src/main/scala/dagr/core/execsystem/{TaskManagerReporter.scala => TopLikeStatusReporter.scala} (83%) rename core/src/test/scala/dagr/core/execsystem2/{TaskLoggerTest.scala => TopLikeStatusReporterTest.scala} (91%) diff --git a/build.sbt b/build.sbt index d9236135..e5ee5b6c 100644 --- a/build.sbt +++ b/build.sbt @@ -123,6 +123,7 @@ lazy val core = Project(id="dagr-core", base=file("core")) "com.fulcrumgenomics" %% "commons" % "0.2.0-SNAPSHOT", "com.fulcrumgenomics" %% "sopt" % "0.2.0-SNAPSHOT", "com.github.dblock" % "oshi-core" % "3.3", + "com.beachape" %% "enumeratum" % "1.5.12", "org.scala-lang" % "scala-reflect" % scalaVersion.value, "org.scala-lang" % "scala-compiler" % scalaVersion.value, "org.reflections" % "reflections" % "0.9.10", diff --git a/core/src/main/scala/dagr/core/cmdline/DagrCoreMain.scala b/core/src/main/scala/dagr/core/cmdline/DagrCoreMain.scala index b6acd115..ee52ff97 100644 --- a/core/src/main/scala/dagr/core/cmdline/DagrCoreMain.scala +++ b/core/src/main/scala/dagr/core/cmdline/DagrCoreMain.scala @@ -23,7 +23,7 @@ */ package dagr.core.cmdline -import java.io.{ByteArrayOutputStream, PrintStream, PrintWriter} +import java.io.PrintWriter import java.net.InetAddress import java.nio.file.{Files, Path} import java.text.DecimalFormat @@ -36,14 +36,13 @@ import com.fulcrumgenomics.sopt.parsing.{ArgOptionAndValues, ArgTokenCollator, A import com.fulcrumgenomics.sopt.util.TermCode import com.fulcrumgenomics.sopt.{Sopt, arg} import dagr.core.config.Configuration -import dagr.core.exec.{Cores, Memory} +import dagr.core.exec.{Cores, Executor, Memory, TaskCache} import dagr.core.execsystem._ import dagr.core.execsystem2.GraphExecutor -import dagr.core.execsystem2.{TopLikeStatusReporter => TopLikeStatusReporter2} import dagr.core.execsystem2.local.LocalTaskExecutor -import dagr.core.execsystem2.replay.TaskCache -import dagr.core.reporting.{ExecutionLogger, FinalStatusReporter, PeriodicRefreshingReporter, Terminal} +import dagr.core.reporting.{ExecutionLogger, Terminal, TopLikeStatusReporter} import dagr.core.tasksystem.Pipeline +import dagr.core.tasksystem.Task.TaskInfo import scala.collection.mutable.ListBuffer import scala.concurrent.ExecutionContext @@ -119,8 +118,7 @@ class DagrCoreArgs( ) extends LazyLogging { // These are not optional, but are only populated during configure() - private var taskManager : Option[TaskManager] = None - private var taskExecutor: Option[LocalTaskExecutor] = None + private var executor : Option[Executor] = None private var reportPath : Option[Path] = None // Initialize the configuration as early as possible @@ -163,10 +161,11 @@ class DagrCoreArgs( val resources = SystemResources(cores = cores.map(Cores(_)), totalMemory = memory.map(Memory(_))) if (experimentalExecution) { - this.taskExecutor = Some(new LocalTaskExecutor(systemResources = resources, scriptsDirectory = scriptsDirectory, logDirectory = logDirectory)) + val taskExecutor = new LocalTaskExecutor(systemResources = resources, scriptsDirectory = scriptsDirectory, logDirectory = logDirectory) + this.executor = Some( GraphExecutor(taskExecutor)) } else { - this.taskManager = Some(new TaskManager(taskManagerResources = resources, scriptsDirectory = scriptsDirectory, logDirectory = logDirectory)) + this.executor = Some(new TaskManager(taskManagerResources = resources, scriptsDirectory = scriptsDirectory, logDirectory = logDirectory)) } // Print all the arguments if desired. @@ -188,72 +187,41 @@ class DagrCoreArgs( */ protected[cmdline] def execute(pipeline : Pipeline)(implicit ex: ExecutionContext): Int = { val report = this.reportPath.getOrElse(throw new IllegalStateException("execute() called before configure()")) - // FIXME: should this path be exposed on the command line? - val executionLog = { - if (report == Io.StdOut.toAbsolutePath) None // FIXME - else Some(report.getParent.resolve("execution_log.txt")) - } - - if (interactive && !Terminal.supportsAnsi) { - logger.warning("ANSI codes are not supported in your terminal. Interactive mode will not be used.") - interactive = false - None - } - def toLoggerOutputStream(): ByteArrayOutputStream = { - val loggerOutputStream = new ByteArrayOutputStream() - val loggerPrintStream = new PrintStream(loggerOutputStream) - Logger.out = loggerPrintStream - loggerOutputStream - } + // Get the executor + val executor = this.executor.getOrElse(throw new IllegalStateException("Executor was not configured, did you all configure()")) - val (finalStatusReporter: FinalStatusReporter, exitCode: Int) = if (experimentalExecution) { - val taskExecutor = this.taskExecutor.getOrElse(throw new IllegalStateException("execute() called before configure()")) - val graphExecutor = GraphExecutor(taskExecutor) - if (interactive) { - val reporter = new TopLikeStatusReporter2( - systemResources = taskExecutor.resources, - loggerOut = Some(toLoggerOutputStream()), - print = s => System.out.print(s) - ) - graphExecutor.withLogger(reporter) + // Set up an interactive logger if desired and supported + if (this.interactive) { + if (Terminal.supportsAnsi) { + TaskInfo.withLogger(TopLikeStatusReporter(executor)) } - executionLog.foreach { log => - val executionLogger = new ExecutionLogger(log) - graphExecutor.withLogger(executionLogger) - graphExecutor.withTaskRegister(executionLogger) - } - this.replayLog.foreach { log => - val taskCache = TaskCache(log) - graphExecutor.withTaskCache(taskCache) + else { + logger.warning("ANSI codes are not supported in your terminal. Interactive mode will not be used.") } - (graphExecutor, graphExecutor execute pipeline) } - else { - val taskMan = this.taskManager.getOrElse(throw new IllegalStateException("execute() called before configure()")) - val interactiveReporter: Option[PeriodicRefreshingReporter] = if (!interactive) { None } else { - val reporter = new TaskManagerReporter(taskManager=taskMan) - Some(new PeriodicRefreshingReporter(reporter=reporter, loggerOut=Some(toLoggerOutputStream()), print = s => System.out.print(s))) - } - interactiveReporter.foreach(_.start()) - - taskMan.addTask(pipeline) - taskMan.runToCompletion(this.failFast) - - interactiveReporter.foreach(_.shutdown()) + // Set up the execution logger + // FIXME: should this path be exposed on the command line? + if (!Seq(Io.StdOut, Io.DevNull).map(_.toAbsolutePath).contains(report)) { + val log = report.getParent.resolve("execution_log.txt") + val executionLogger = new ExecutionLogger(log) + TaskInfo.withLogger(executionLogger) + executor.withTaskRegister(executionLogger) + } - // return an exit code based on the number of non-completed tasks - val code = taskMan.taskToInfoBiMapFor.count { case (_, info) => - TaskStatus.notDone(info.status, failedIsDone=false) - } - (taskMan, code) + // Set up the task cache (in case of replay) + this.replayLog.foreach { log => + executor.withTaskCache(TaskCache(log)) } + // execute + val exitCode = executor.execute(pipeline) + // Write out the execution report if (!interactive || Io.StdOut != report) { val pw = new PrintWriter(Io.toWriter(report)) - finalStatusReporter.logReport({ str: String => pw.write(str + "\n") }) + executor.logReport({ str: String => pw.write(str + "\n") }) pw.close() } diff --git a/core/src/main/scala/dagr/core/execsystem2/ExecDef.scala b/core/src/main/scala/dagr/core/exec/ExecDef.scala similarity index 93% rename from core/src/main/scala/dagr/core/execsystem2/ExecDef.scala rename to core/src/main/scala/dagr/core/exec/ExecDef.scala index 6445b958..19498b3f 100644 --- a/core/src/main/scala/dagr/core/execsystem2/ExecDef.scala +++ b/core/src/main/scala/dagr/core/exec/ExecDef.scala @@ -1,7 +1,7 @@ /* * The MIT License * - * Copyright (c) 2017 Fulcrum Genomics LLC + * Copyright (c) 2017 Fulcrum GenomicsLLC * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -23,11 +23,11 @@ * */ -package dagr.core.execsystem2 +package dagr.core.exec import scala.collection.mutable -private[execsystem2] object ExecDef { +object ExecDef { /** Create a thread-safe mutable map. */ def concurrentMap[A,B](): mutable.Map[A,B] = { import scala.collection.convert.decorateAsScala._ diff --git a/core/src/main/scala/dagr/core/exec/Executor.scala b/core/src/main/scala/dagr/core/exec/Executor.scala new file mode 100644 index 00000000..bfe5d4e8 --- /dev/null +++ b/core/src/main/scala/dagr/core/exec/Executor.scala @@ -0,0 +1,57 @@ +/* + * The MIT License + * + * Copyright (c) 2017 Fulcrum Genomics LLC + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + * + */ + +package dagr.core.exec + +import dagr.core.execsystem2.TaskStatusLogger +import dagr.core.reporting.FinalStatusReporter +import dagr.core.reporting.ReportingDef.{TaskLogger, TaskRegister} +import dagr.core.tasksystem.Task +import com.fulcrumgenomics.commons.CommonsDef.yieldAndThen + +import scala.collection.mutable.ListBuffer + +/** All executors of tasks should extend this trait. */ +trait Executor extends FinalStatusReporter { + + /** A list of [[TaskRegister]]s that will be notified when a list of tasks is returned by [[Task.getTasks]]. */ + protected val registers: ListBuffer[TaskRegister] = ListBuffer[TaskRegister]() + + /** A list of [[TaskCache]] to use to determine if a task should be manually succeeded. */ + protected val taskCaches: ListBuffer[TaskCache] = ListBuffer[TaskCache]() + + /** Adds the [[TaskRegister]] to the list of registers to be notified when a list of tasks is returned by [[Task.getTasks]] */ + def withTaskRegister(register: TaskRegister): this.type = yieldAndThen[this.type](this)(this.registers.append(register)) + + /** Adds the [[TaskCache]] to the list of caches to use to determine if a task should be manually succeeded. */ + def withTaskCache(taskCache: TaskCache): this.type = yieldAndThen[this.type](this) { + withTaskRegister(taskCache) + this.taskCaches.append(taskCache) + } + + /** Start the execution of this task and all tasks that depend on it. Returns the number of tasks that were not + * executed. A given task should only be attempted once. */ + def execute(task: Task): Int +} diff --git a/core/src/main/scala/dagr/core/execsystem2/replay/TaskCache.scala b/core/src/main/scala/dagr/core/exec/TaskCache.scala similarity index 95% rename from core/src/main/scala/dagr/core/execsystem2/replay/TaskCache.scala rename to core/src/main/scala/dagr/core/exec/TaskCache.scala index 5c3a81cd..d2be061c 100644 --- a/core/src/main/scala/dagr/core/execsystem2/replay/TaskCache.scala +++ b/core/src/main/scala/dagr/core/exec/TaskCache.scala @@ -23,12 +23,11 @@ * */ -package dagr.core.execsystem2.replay +package dagr.core.exec import com.fulcrumgenomics.commons.CommonsDef.FilePath import com.fulcrumgenomics.commons.io.Io import com.fulcrumgenomics.commons.util.LazyLogging -import dagr.core.execsystem2.{ManuallySucceeded, SucceededExecution} import dagr.core.reporting.ExecutionLogger.{Definition, Relationship, Status} import dagr.core.reporting.ReportingDef.TaskRegister import dagr.core.tasksystem.Task @@ -159,12 +158,13 @@ class SimpleTaskCache(replayLog: FilePath) extends TaskCache with LazyLogging { /** Updates tasksToExecute if the task should be executed. */ private def maybeSetTaskToExecute(task: Task, taskReplayDefinition: Definition): Unit = { - val status = this.statuses.filter { status => status.definitionCode == taskReplayDefinition.code } + val succeededExecution = this.statuses.filter { status => status.definitionCode == taskReplayDefinition.code } .sortBy(-_.statusOrdinal) + .map(status => task.taskInfo.status.from(status.statusOrdinal).success) .headOption - .getOrElse(Status(taskReplayDefinition.code, "", -1)) // the OrElse occurs if no status was set - // NB: this is specific to execsystem2 - if (status.statusOrdinal != SucceededExecution.ordinal && status.statusOrdinal != ManuallySucceeded.ordinal) { + .getOrElse(false) + + if (succeededExecution) { // execute it logger.debug(s"Adding task ${task.name} to execute") this.tasksToExecute += task diff --git a/core/src/main/scala/dagr/core/execsystem/TaskManager.scala b/core/src/main/scala/dagr/core/execsystem/TaskManager.scala index c7855e9d..4fe96f0f 100644 --- a/core/src/main/scala/dagr/core/execsystem/TaskManager.scala +++ b/core/src/main/scala/dagr/core/execsystem/TaskManager.scala @@ -31,7 +31,6 @@ import com.fulcrumgenomics.commons.io.{Io, PathUtil} import com.fulcrumgenomics.commons.util.LazyLogging import dagr.core.DagrDef._ import dagr.core.exec._ -import dagr.core.reporting.FinalStatusReporter import dagr.core.tasksystem._ /** The resources needed for the task manager */ @@ -110,11 +109,12 @@ object TaskManager extends LazyLogging { logDirectory = logDirectory, scheduler = scheduler.getOrElse(defaultScheduler), simulate = simulate, - sleepMilliseconds = sleepMilliseconds + sleepMilliseconds = sleepMilliseconds, + failFast = failFast ) taskManager.addTask(task = task) - taskManager.runToCompletion(failFast=failFast) + taskManager.runToCompletion() taskManager.taskToInfoBiMapFor } @@ -136,8 +136,9 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de logDirectory: Option[Path] = None, scheduler: Scheduler = TaskManagerDefaults.defaultScheduler, simulate: Boolean = false, - sleepMilliseconds: Int = 1000 -) extends TaskManagerLike with TaskTracker with FinalStatusReporter with LazyLogging { + sleepMilliseconds: Int = 1000, + failFast: Boolean = false +) extends TaskManagerLike with TaskTracker with Executor with LazyLogging { private val actualScriptsDirectory = scriptsDirectory getOrElse Io.makeTempDir("scripts") protected val actualLogsDirectory = logDirectory getOrElse Io.makeTempDir("logs") @@ -549,7 +550,7 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de ) } - override def runToCompletion(failFast: Boolean): BiMap[Task, TaskExecutionInfo] = { + override def runToCompletion(): BiMap[Task, TaskExecutionInfo] = { var allDone = false while (!allDone) { val (readyTasks, tasksToSchedule, runningTasks, _) = stepExecution() @@ -582,7 +583,7 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de allDone = true } - else if (failFast && hasFailedTasks) { + else if (this.failFast && hasFailedTasks) { allDone = true } } @@ -609,4 +610,16 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de taskToInfoBiMapFor } + + override def execute(task: Task): Int = { + this.addTask(task) + this.runToCompletion() + this.taskToInfoBiMapFor.count { case (_, info) => + TaskStatus.notDone(info.status, failedIsDone=false) + } + } + + // TODO: + // support TaskRegister + // support TaskCache } diff --git a/core/src/main/scala/dagr/core/execsystem/TaskManagerLike.scala b/core/src/main/scala/dagr/core/execsystem/TaskManagerLike.scala index a7649de3..de4a4778 100644 --- a/core/src/main/scala/dagr/core/execsystem/TaskManagerLike.scala +++ b/core/src/main/scala/dagr/core/execsystem/TaskManagerLike.scala @@ -143,7 +143,7 @@ private[execsystem] trait TaskManagerLike { * * @return a bi-directional map from the set of tasks to their execution information. */ - def runToCompletion(failFast: Boolean): BiMap[Task, TaskExecutionInfo] + def runToCompletion(): BiMap[Task, TaskExecutionInfo] /** Run a a single iteration of managing tasks. * diff --git a/core/src/main/scala/dagr/core/execsystem/TaskStatus.scala b/core/src/main/scala/dagr/core/execsystem/TaskStatus.scala index c51b9edd..9cd5b9f8 100644 --- a/core/src/main/scala/dagr/core/execsystem/TaskStatus.scala +++ b/core/src/main/scala/dagr/core/execsystem/TaskStatus.scala @@ -23,13 +23,22 @@ */ package dagr.core.execsystem +import dagr.core.execsystem.TaskStatus.{ManuallySucceeded, SucceededExecution} +import dagr.core.tasksystem.Task import dagr.core.tasksystem.Task.{TaskStatus => RootTaskStatus} +import enumeratum.values.{IntEnum, IntEnumEntry} -sealed trait TaskStatus extends RootTaskStatus { - override def toString: String = description +sealed abstract class TaskStatus extends IntEnumEntry with Task.TaskStatus { + override def ordinal = this.value + /** Returns true if this status indicates any type of success, false otherwise. */ + def success: Boolean = this.ordinal == SucceededExecution.ordinal || this.ordinal == ManuallySucceeded.ordinal + /** Returns the task status by ordinal */ + def from(ordinal: Int): TaskStatus = TaskStatus.withValue(ordinal) } -object TaskStatus { +case object TaskStatus extends IntEnum[TaskStatus] { + val values = findValues + /** Checks if a task with a given status is done. * * @param taskStatus the status of the task @@ -64,17 +73,17 @@ object TaskStatus { sealed trait Succeeded extends Completed // High-level statuses - case object Unknown extends TaskStatus { val description: String = "is unknown"; val ordinal: Int = 0 } - case object Started extends TaskStatus { val description: String = "has been started"; val ordinal: Int = 1 } - case object Stopped extends Completed { val description: String = "has been stopped"; val ordinal: Int = 2 } + case object Unknown extends TaskStatus { val description: String = "is unknown"; val value: Int = 0 } + case object Started extends TaskStatus { val description: String = "has been started"; val value: Int = 1 } + case object Stopped extends Completed { val description: String = "has been stopped"; val value: Int = 2 } // Statuses after execution has completed - case object FailedGetTasks extends Failed { val description: String = "has failed (could not get the list of tasks)"; val ordinal: Int = 3 } - case object FailedScheduling extends Failed { val description: String = "has failed (could not start executing after scheduling)"; val ordinal: Int = 4 } - case object FailedExecution extends Failed { val description: String = "has failed (execution)"; val ordinal: Int = 5 } - case object FailedOnComplete extends Failed { val description: String = "Failed during the onComplete callback"; val ordinal: Int = 6 } - case object SucceededExecution extends Succeeded { val description: String = "has succeeded"; val ordinal: Int = 7 } - case object ManuallySucceeded extends Succeeded { val description: String = "has succeeded (manually)"; val ordinal: Int = 8 } + case object FailedGetTasks extends Failed { val description: String = "has failed (could not get the list of tasks)"; val value: Int = 3 } + case object FailedScheduling extends Failed { val description: String = "has failed (could not start executing after scheduling)"; val value: Int = 4 } + case object FailedExecution extends Failed { val description: String = "has failed (execution)"; val value: Int = 5 } + case object FailedOnComplete extends Failed { val description: String = "Failed during the onComplete callback"; val value: Int = 6 } + case object SucceededExecution extends Succeeded { val description: String = "has succeeded"; val value: Int = 7 } + case object ManuallySucceeded extends Succeeded { val description: String = "has succeeded (manually)"; val value: Int = 8 } val TaskStatuses = Seq(Unknown, Started, Stopped, FailedGetTasks, FailedScheduling, FailedExecution, FailedOnComplete, SucceededExecution, ManuallySucceeded) } diff --git a/core/src/main/scala/dagr/core/execsystem/TaskManagerReporter.scala b/core/src/main/scala/dagr/core/execsystem/TopLikeStatusReporter.scala similarity index 83% rename from core/src/main/scala/dagr/core/execsystem/TaskManagerReporter.scala rename to core/src/main/scala/dagr/core/execsystem/TopLikeStatusReporter.scala index 0e1b892b..359343e2 100644 --- a/core/src/main/scala/dagr/core/execsystem/TaskManagerReporter.scala +++ b/core/src/main/scala/dagr/core/execsystem/TopLikeStatusReporter.scala @@ -25,24 +25,27 @@ package dagr.core.execsystem +import java.io.ByteArrayOutputStream + import dagr.core.execsystem.GraphNodeState.NO_PREDECESSORS -import dagr.core.reporting.TopLikeStatusReporter +import dagr.core.reporting.{TopLikeStatusReporter => BaseTopLikeStatusReporter} import dagr.core.tasksystem.Task /** A simple reporter for [[dagr.core.execsystem.TaskManager]]. */ -class TaskManagerReporter(taskManager: TaskManager) extends TopLikeStatusReporter { +class TopLikeStatusReporter(taskManager: TaskManager, + protected val loggerOut: Option[ByteArrayOutputStream] = None, + protected val print: String => Unit = print) extends BaseTopLikeStatusReporter { /** the total system resources */ protected def systemResources: SystemResources = taskManager.getTaskManagerResources - /** the set of all tests about which are currently known */ - protected def tasks: Traversable[Task] = taskManager.taskToInfoBiMapFor.keys - /** True if the task is running, false otherwise. */ protected def running(task: Task): Boolean = task.taskInfo.status == TaskStatus.Started // taskManager.running(task) /** True if the task is ready for execution (no dependencies), false otherwise. */ - protected def queued(task: Task): Boolean = taskManager.graphNodeFor(task).get.state == GraphNodeState.NO_PREDECESSORS + protected def queued(task: Task): Boolean = taskManager.graphNodeFor(task).exists { node => + node.state == GraphNodeState.NO_PREDECESSORS && task.taskInfo.status == TaskStatus.Unknown + } /** True if the task has failed, false otherwise. */ protected def failed(task: Task): Boolean = TaskStatus.failed(task.taskInfo.status) diff --git a/core/src/main/scala/dagr/core/execsystem2/DependencyGraph.scala b/core/src/main/scala/dagr/core/execsystem2/DependencyGraph.scala index bf03110f..9d8bc0e8 100644 --- a/core/src/main/scala/dagr/core/execsystem2/DependencyGraph.scala +++ b/core/src/main/scala/dagr/core/execsystem2/DependencyGraph.scala @@ -29,6 +29,7 @@ import java.util.concurrent.CountDownLatch import java.util.concurrent.atomic.AtomicInteger import com.fulcrumgenomics.commons.util.LazyLogging +import dagr.core.exec.ExecDef import dagr.core.tasksystem.Task /** diff --git a/core/src/main/scala/dagr/core/execsystem2/GraphExecutor.scala b/core/src/main/scala/dagr/core/execsystem2/GraphExecutor.scala index 3deca8fc..cd762c75 100644 --- a/core/src/main/scala/dagr/core/execsystem2/GraphExecutor.scala +++ b/core/src/main/scala/dagr/core/execsystem2/GraphExecutor.scala @@ -26,14 +26,13 @@ package dagr.core.execsystem2 import com.fulcrumgenomics.commons.util.LazyLogging -import dagr.core.execsystem2.replay.TaskCache -import dagr.core.reporting.FinalStatusReporter -import dagr.core.reporting.ReportingDef.{TaskLogger, TaskRegister} +import dagr.core.exec.{ExecDef, Executor} +import dagr.core.execsystem.SystemResources import dagr.core.tasksystem.Task.{TaskInfo => RootTaskInfo} import dagr.core.tasksystem.{Retry, Task} +import dagr.core.execsystem2.TaskStatus._ import scala.collection.mutable -import scala.collection.mutable.ListBuffer import scala.concurrent.duration.Duration import scala.concurrent.{Await, ExecutionContext, Future, blocking} import scala.util.{Failure, Success} @@ -41,7 +40,7 @@ import scala.util.{Failure, Success} /** Coordinates between the dependency graph ([[DependencyGraph]]) and task executor ([[TaskExecutor]]) given a (root) * task to execute. */ -trait GraphExecutor[T<:Task] extends FinalStatusReporter { +trait GraphExecutor[T<:Task] extends Executor { /** Start the execution of this task and all tasks that depend on it. Returns the number of tasks that were not * executed. A given task should only be attempted once. */ def execute(rootTask: Task): Int @@ -50,16 +49,8 @@ trait GraphExecutor[T<:Task] extends FinalStatusReporter { * a task may not yet be contained in the graph. */ def contains(task: Task): Boolean - /** Adds the [[TaskLogger]] to the list of loggers to be notified when a task's status is updated. */ - def withLogger(logger: TaskLogger): this.type - - /** Adds the [[TaskRegister]] to the list of registers to be notified when a list of tasks is returned by [[Task.getTasks]] */ - def withTaskRegister(register: TaskRegister): this.type - - /** Adds the [[TaskCache]] to the list of caches to use to determine if a task should be manually succeeded. */ - def withTaskCache(taskCache: TaskCache): this.type = { - withTaskRegister(taskCache) - } + /** Returns the resources used by the underlying task executor(s). */ + def resources: Option[SystemResources] = Some(this.taskExecutor.resources) /** Returns the executor that execute tasks. */ protected def taskExecutor: TaskExecutor[T] @@ -95,28 +86,6 @@ private class GraphExecutorImpl[T<:Task](protected val taskExecutor: TaskExecuto /** A lock to synchronize when the task execution or dependency information is updated. */ private val lock: Object = (dependencyGraph, _tasks) - /** A list of [[TaskLogger]]s that will be notified when a task's status is updated. */ - private val loggers: ListBuffer[TaskLogger] = ListBuffer[TaskLogger](new TaskStatusLogger) - - /** A list of [[TaskRegister]]s that will be notified when a list of tasks is returned by [[Task.getTasks]]. */ - private val registers: ListBuffer[TaskRegister] = ListBuffer[TaskRegister]() - - /** A list of [[TaskCache]] to use to determine if a task should be manually succeeded. */ - private val taskCaches: ListBuffer[TaskCache] = ListBuffer[TaskCache]() - - /** Adds the [[TaskLogger]] to the list of loggers to be notified when a task's status is updated. */ - def withLogger(logger: TaskLogger): this.type = { this.loggers.append(logger); this } - - /** Adds the [[TaskRegister]] to the list of registers to be notified when a list of tasks is returned by [[Task.getTasks]] */ - def withTaskRegister(register: TaskRegister): this.type = { this.registers.append(register); this } - - /** Adds the [[TaskCache]] to the list of caches to use to determine if a task should be manually succeeded. */ - override def withTaskCache(taskCache: TaskCache): this.type = { - super.withTaskCache(taskCache) - this.taskCaches.append(taskCache) - this - } - /** The tasks currently known by the executor. */ def tasks: Traversable[Task] = this._tasks @@ -371,13 +340,9 @@ private class GraphExecutorImpl[T<:Task](protected val taskExecutor: TaskExecuto initStatus = status ) this._tasks.add(task) - this.loggers.foreach { logger => logger.record(task.taskInfo) } } else { - // only log it if the status changes - val logIt = task.taskInfo.status != status task.taskInfo.status = status - if (logIt) this.loggers.foreach { logger => logger.record(task.taskInfo) } } task.taskInfo } diff --git a/core/src/main/scala/dagr/core/execsystem2/TaskInfo.scala b/core/src/main/scala/dagr/core/execsystem2/TaskInfo.scala index 7097ccee..96bc1136 100644 --- a/core/src/main/scala/dagr/core/execsystem2/TaskInfo.scala +++ b/core/src/main/scala/dagr/core/execsystem2/TaskInfo.scala @@ -28,6 +28,7 @@ package dagr.core.execsystem2 import java.time.Instant import dagr.core.tasksystem.Task +import dagr.core.execsystem2.TaskStatus._ /** [[dagr.core.tasksystem.Task.TaskInfo]] implementation specific to [[dagr.core.execsystem2]]. */ class TaskInfo(task: Task, initStatus: TaskStatus) diff --git a/core/src/main/scala/dagr/core/execsystem2/TaskStatus.scala b/core/src/main/scala/dagr/core/execsystem2/TaskStatus.scala index f391abd4..f508c9a2 100644 --- a/core/src/main/scala/dagr/core/execsystem2/TaskStatus.scala +++ b/core/src/main/scala/dagr/core/execsystem2/TaskStatus.scala @@ -25,30 +25,42 @@ package dagr.core.execsystem2 +import dagr.core.execsystem2.TaskStatus.{ManuallySucceeded, SucceededExecution} import dagr.core.tasksystem.Task +import enumeratum.values.{IntEnum, IntEnumEntry} /** The root of all task statuses in [[dagr.core.execsystem2]]. */ -sealed trait TaskStatus extends Task.TaskStatus +sealed abstract class TaskStatus extends IntEnumEntry with Task.TaskStatus { + override def ordinal = this.value + /** Returns true if this status indicates any type of success, false otherwise. */ + def success: Boolean = this.ordinal == SucceededExecution.ordinal || this.ordinal == ManuallySucceeded.ordinal + /** Returns the task status by ordinal */ + def from(ordinal: Int): TaskStatus = TaskStatus.withValue(ordinal) +} -/** Trait for all statuses prior to submission for execution */ -sealed trait PreSubmission extends TaskStatus -case object Pending extends PreSubmission { val description: String = "Has unmet dependencies"; val ordinal: Int = 0 } -case object Queued extends PreSubmission { val description: String = "Ready for execution"; val ordinal: Int = 1 } +case object TaskStatus extends IntEnum[TaskStatus] { + val values = findValues -/** Trait for all statuses during execution */ -sealed trait Executing extends TaskStatus -case object Submitted extends Executing { val description: String = "Submitted for execution"; val ordinal: Int = 2 } -case object Running extends Executing { val description: String = "Executing"; val ordinal: Int = 3 } + /** Trait for all statuses prior to submission for execution */ + sealed trait PreSubmission extends TaskStatus + case object Pending extends PreSubmission { val description: String = "Has unmet dependencies"; val value: Int = 0 } + case object Queued extends PreSubmission { val description: String = "Ready for execution"; val value: Int = 1 } -/** Trait for all statuses after execution has completed */ -sealed trait Completed extends TaskStatus -sealed trait Failed extends Completed -sealed trait Succeeded extends Completed -case object Stopped extends Completed { val description: String = "Stopped prior to completion"; val ordinal: Int = 4 } -case object FailedToBuild extends Failed { val description: String = "Failed to build the task"; val ordinal: Int = 5 } -case object FailedSubmission extends Failed { val description: String = "Could not be submitted to the executor"; val ordinal: Int = 6 } -case object FailedExecution extends Failed { val description: String = "Failed during execution"; val ordinal: Int = 7 } -case object FailedOnComplete extends Failed { val description: String = "Failed during the onComplete callback"; val ordinal: Int = 8 } -case object FailedUnknown extends Failed { val description: String = "Failed for unknown reasons"; val ordinal: Int = 9 } -case object SucceededExecution extends Succeeded { val description: String = "Succeeded execution"; val ordinal: Int = 10 } -case object ManuallySucceeded extends Succeeded { val description: String = "Manually marked as succeeded"; val ordinal: Int = 11 } + /** Trait for all statuses during execution */ + sealed trait Executing extends TaskStatus + case object Submitted extends Executing { val description: String = "Submitted for execution"; val value: Int = 2 } + case object Running extends Executing { val description: String = "Executing"; val value: Int = 3 } + + /** Trait for all statuses after execution has completed */ + sealed trait Completed extends TaskStatus + sealed trait Failed extends Completed + sealed trait Succeeded extends Completed + case object Stopped extends Completed { val description: String = "Stopped prior to completion"; val value: Int = 4 } + case object FailedToBuild extends Failed { val description: String = "Failed to build the task"; val value: Int = 5 } + case object FailedSubmission extends Failed { val description: String = "Could not be submitted to the executor"; val value: Int = 6 } + case object FailedExecution extends Failed { val description: String = "Failed during execution"; val value: Int = 7 } + case object FailedOnComplete extends Failed { val description: String = "Failed during the onComplete callback"; val value: Int = 8 } + case object FailedUnknown extends Failed { val description: String = "Failed for unknown reasons"; val value: Int = 9 } + case object SucceededExecution extends Succeeded { val description: String = "Succeeded execution"; val value: Int = 10 } + case object ManuallySucceeded extends Succeeded { val description: String = "Manually marked as succeeded"; val value: Int = 11 } +} diff --git a/core/src/main/scala/dagr/core/execsystem2/TopLikeStatusReporter.scala b/core/src/main/scala/dagr/core/execsystem2/TopLikeStatusReporter.scala index ff49f513..178606c4 100644 --- a/core/src/main/scala/dagr/core/execsystem2/TopLikeStatusReporter.scala +++ b/core/src/main/scala/dagr/core/execsystem2/TopLikeStatusReporter.scala @@ -28,11 +28,12 @@ package dagr.core.execsystem2 import java.io.ByteArrayOutputStream import dagr.core.execsystem.SystemResources -import dagr.core.execsystem2.ExecDef.concurrentSet +import dagr.core.exec.ExecDef.concurrentSet import dagr.core.reporting.ReportingDef.TaskLogger import dagr.core.reporting.{TopLikeStatusReporter => BaseTopLikeStatusReporter} import dagr.core.tasksystem.Task import dagr.core.tasksystem.Task.{TaskInfo => RootTaskInfo} +import dagr.core.execsystem2.TaskStatus._ import scala.collection.mutable @@ -42,23 +43,10 @@ import scala.collection.mutable * @param print the method to use to write task status information, one line at a time. */ class TopLikeStatusReporter(val systemResources: SystemResources, - loggerOut: Option[ByteArrayOutputStream] = None, - print: String => Unit = print) + protected val loggerOut: Option[ByteArrayOutputStream] = None, + protected val print: String => Unit = print) extends BaseTopLikeStatusReporter with TaskLogger { - // All the tasks we have ever known - private val _tasks: mutable.Set[Task] = concurrentSet() - - /** This method is called when any info about a task is updated. */ - def record(info: RootTaskInfo): Unit = { - // add the task to the set of known tasks - this._tasks += info.task - // refresh the screen - this.refresh(loggerOut=loggerOut, print=print) - } - - /** The set of all tests about which are currently known */ - protected def tasks: Traversable[Task] = _tasks /** True if the task is running, false otherwise. */ protected[execsystem2] def running(task: Task): Boolean = task.taskInfo.status == Running diff --git a/core/src/main/scala/dagr/core/execsystem2/local/LocalTaskExecutor.scala b/core/src/main/scala/dagr/core/execsystem2/local/LocalTaskExecutor.scala index 7ef60e9d..5e8f70aa 100644 --- a/core/src/main/scala/dagr/core/execsystem2/local/LocalTaskExecutor.scala +++ b/core/src/main/scala/dagr/core/execsystem2/local/LocalTaskExecutor.scala @@ -33,14 +33,14 @@ import com.fulcrumgenomics.commons.io.{Io, PathUtil} import com.fulcrumgenomics.commons.util.LazyLogging import dagr.core.DagrDef.TaskId import dagr.core.execsystem._ -import dagr.core.execsystem2.{ExecDef, TaskExecutor} +import dagr.core.execsystem2.TaskExecutor import dagr.core.tasksystem._ import scala.collection.mutable import scala.concurrent._ import dagr.core.execsystem2.util.InterruptableFuture.Interruptable import dagr.core.execsystem2.util.InterruptableFuture -import dagr.core.exec.{NaiveScheduler, Resource, ResourceSet, Scheduler} +import dagr.core.exec._ import scala.concurrent.duration.Duration diff --git a/core/src/main/scala/dagr/core/reporting/ExecutionLogger.scala b/core/src/main/scala/dagr/core/reporting/ExecutionLogger.scala index 399ff75f..75c0a87f 100644 --- a/core/src/main/scala/dagr/core/reporting/ExecutionLogger.scala +++ b/core/src/main/scala/dagr/core/reporting/ExecutionLogger.scala @@ -38,8 +38,6 @@ object ExecutionLogger { val Separator: String = "," - - object Definition { val Name: String = "DEFINITION" val Header: Seq[String] = Seq("SIMPLE_NAME", "NAME", "CODE", "PARENT_CODE", "CHILD_NUMBER") diff --git a/core/src/main/scala/dagr/core/reporting/TopLikeStatusReporter.scala b/core/src/main/scala/dagr/core/reporting/TopLikeStatusReporter.scala index 86b3bb19..8635f6c3 100644 --- a/core/src/main/scala/dagr/core/reporting/TopLikeStatusReporter.scala +++ b/core/src/main/scala/dagr/core/reporting/TopLikeStatusReporter.scala @@ -25,14 +25,19 @@ package dagr.core.reporting -import java.io.ByteArrayOutputStream -import java.util.concurrent.atomic.AtomicBoolean +import java.io.{ByteArrayOutputStream, PrintStream} +import com.fulcrumgenomics.commons.util.Logger import com.fulcrumgenomics.commons.util.StringUtil._ import com.fulcrumgenomics.commons.util.TimeUtil._ -import dagr.core.execsystem.SystemResources -import dagr.core.reporting.PeriodicRefreshingReporter.StatusRunnable +import dagr.core.exec.ExecDef.concurrentSet +import dagr.core.exec.Executor +import dagr.core.execsystem.{SystemResources, TaskManager} +import dagr.core.execsystem2.GraphExecutor +import dagr.core.reporting.ReportingDef.TaskLogger +import dagr.core.tasksystem.Task.TaskInfo import dagr.core.tasksystem.{InJvmTask, ProcessTask, Task, UnitTask} +import com.fulcrumgenomics.commons.CommonsDef.yieldAndThen import scala.collection.mutable import scala.collection.mutable.ListBuffer @@ -80,8 +85,8 @@ object TopLikeStatusReporter { private def wrap(output: String, maxLines: Int, maxColumns: Int): String = { var lines = output.split("\n") lines = if (maxLines <= lines.length) { - val numLinesLeft = lines.length - (maxLines-2) - lines.slice(0, maxLines-2) ++ List(s"... with $numLinesLeft more lines not shown ...") + val numLinesLeft = lines.length - (maxLines-1) + lines.slice(0, maxLines-1) ++ List(s"... with $numLinesLeft more lines not shown ...") } else { lines @@ -108,7 +113,36 @@ object TopLikeStatusReporter { " " ) } + } + + /** Creates a new [[TopLikeStatusReporter]] specific to the given executor. Will add itself to the list of loggers to + * be notified by a change in status in [[TaskInfo]]. */ + def apply(executor: Executor): TopLikeStatusReporter = { + val loggerOutputStream: ByteArrayOutputStream = { + val outStream = new ByteArrayOutputStream() + val printStream = new PrintStream(outStream) + Logger.out = printStream + outStream + } + val logger = executor match { + case taskManager: TaskManager => + new dagr.core.execsystem.TopLikeStatusReporter( + taskManager = taskManager, + loggerOut = Some(loggerOutputStream), + print = s => System.out.print(s) + ) + case graphExecutor: GraphExecutor[_] => + new dagr.core.execsystem2.TopLikeStatusReporter( + systemResources = graphExecutor.resources.getOrElse(throw new IllegalArgumentException("No resource set defined")), + loggerOut = Some(loggerOutputStream), + print = s => System.out.print(s) + ) + case _ => throw new IllegalArgumentException(s"Unknown executor: '${executor.getClass.getSimpleName}'") + } + + // Register! + yieldAndThen(logger)(TaskInfo.withLogger(logger)) } } @@ -116,16 +150,28 @@ object TopLikeStatusReporter { /** A top-like status reporter that prints execution information to the terminal. The * [[TopLikeStatusReporter#refresh()]] method is used to refresh the terminal. Currently * only displays tasks the extend [[UnitTask]]. */ -trait TopLikeStatusReporter extends Terminal { +trait TopLikeStatusReporter extends TaskLogger with Terminal { import TopLikeStatusReporter.Column.{Column => Field} import TopLikeStatusReporter.{Column, _} + /** All the tasks we have ever known */ + private val _tasks: mutable.Set[Task] = concurrentSet() + + // refresh initially + this.refresh(loggerOut=loggerOut, print=print) + + /** The set of all tests about which are currently known */ + protected def tasks: Traversable[Task] = _tasks + + /** A stream from which to read log messages. */ + protected def loggerOut: Option[ByteArrayOutputStream] + + /** The method use to print the status. */ + protected def print: String => Unit + /** the total system resources */ protected def systemResources: SystemResources - /** the set of all tests about which are currently known */ - protected def tasks: Traversable[Task] - /** True if the task is running, false otherwise. */ protected def running(task: Task): Boolean @@ -270,60 +316,12 @@ trait TopLikeStatusReporter extends Terminal { numLinesLeft -= taskStatusTable.count(_ == '\n') + 1 } } -} - - -object PeriodicRefreshingReporter { - /** Refreshes the top-like interface until terminate is called. - * - * @param reporter the reporter which to refresh. - * @param loggerOut the stream to which log messages are written, or none if no stream is available. - * @param refreshRate the refresh rate in ms. - */ - private class StatusRunnable(reporter: TopLikeStatusReporter, - loggerOut: Option[ByteArrayOutputStream] = None, - refreshRate: Int = 1000, - print: String => Unit = print) extends Runnable { - private val keepPrinting: AtomicBoolean = new AtomicBoolean(true) - def terminate(): Unit = keepPrinting.set(false) - override def run(): Unit = { - // skip over any current output - print(CursorMovement.clearScreen) - // keep printing until told otherwise - while (keepPrinting.get()) { - reporter.refresh(print=print, loggerOut) - Thread.sleep(refreshRate) - } - } - } - - /** The amount of time in milliseconds to wait for the thread to join */ - private val ShutdownJoinTime = 5000 -} - -/** A simple class that calls [[TopLikeStatusReporter#refresh]] periodically. */ -class PeriodicRefreshingReporter(reporter: TopLikeStatusReporter, - loggerOut: Option[ByteArrayOutputStream] = None, - print: String => Unit = print) -{ - private val terminalRunnable = new StatusRunnable(reporter=reporter, loggerOut=loggerOut, print=print) - private val terminalThread = new Thread(terminalRunnable) - - /** Start the top like status thread */ - def start(): Unit = { - terminalThread.setDaemon(true) - reporter.refresh(print=print) - terminalThread.start() - } - - /** Forces a refresh */ - def refresh(): Unit = reporter.refresh(print=print, loggerOut=loggerOut) - /** Shutdown the top like status thread */ - def shutdown(): Unit = { - terminalRunnable.terminate() - terminalThread.join(PeriodicRefreshingReporter.ShutdownJoinTime) - reporter.refresh(print=print) - print("\n") + /** This method is called when any info about a task is updated. */ + final def record(info: TaskInfo): Unit = { + // add the task to the set of known tasks + this._tasks += info.task + // refresh the screen + this.refresh(loggerOut=loggerOut, print=print) } } diff --git a/core/src/main/scala/dagr/core/tasksystem/Task.scala b/core/src/main/scala/dagr/core/tasksystem/Task.scala index 55aaf16e..60cd7469 100644 --- a/core/src/main/scala/dagr/core/tasksystem/Task.scala +++ b/core/src/main/scala/dagr/core/tasksystem/Task.scala @@ -31,6 +31,8 @@ import com.fulcrumgenomics.commons.util.TimeUtil.formatElapsedTime import dagr.core.DagrDef.TaskId import dagr.core.exec.ResourceSet import dagr.core.execsystem.TaskExecutionInfo +import dagr.core.execsystem2.TaskStatusLogger +import dagr.core.reporting.ReportingDef.TaskLogger import dagr.core.tasksystem.Task.TaskInfo import scala.collection.mutable @@ -49,6 +51,10 @@ object Task { def ordinal: Int /** The name of the status, by default the class' simple name (sanitized). */ def name: String = this.getClass.getSimpleName.replaceFirst("[$].*$", "") + /** Returns true if this status indicates any type of success, false otherwise. */ + def success: Boolean + /** Returns the task status by ordinal */ + def from(ordinal: Int): TaskStatus /** The string representation of the status, by default the definition. */ override def toString: String = this.description } @@ -56,6 +62,19 @@ object Task { /** A tuple representing the instant the task was set to the given status. */ private[core] case class TimePoint(status: TaskStatus, instant: Instant) + private[core] object TaskInfo { + /** The loggers to be notified when a task's status is updated. */ + private val _loggers: ListBuffer[TaskLogger] = ListBuffer[TaskLogger](new TaskStatusLogger) + + /** Adds the [[TaskLogger]] to the list of loggers to be notified when a task's status is updated. */ + def withLogger(logger: TaskLogger): Unit = if (!this._loggers.contains(logger)) { + this._loggers.append(logger) + } + + /** Gets all the task loggers interested in status changes. */ + private[tasksystem] def loggers: Seq[TaskLogger] = this._loggers + } + /** Execution information associated with a task. Any execution system should extend this class to store * their specific metadata. * @param task the task in question @@ -95,13 +114,14 @@ object Task { // Update the reference in [[Task]] to this. task._taskInfo = Some(this) - /** The set of time points that contains time points of the instant a status was set. */ private val _timePoints: mutable.ArrayBuffer[TimePoint] = new mutable.ArrayBuffer[TimePoint]() // set the time the status was initially set to NOW! this._timePoints.append(TimePoint(initStatus, Instant.now)) + TaskInfo.loggers.foreach(_.record(this)) // notify all of the initial status + /** Updates the instant for the status if the given status different from the current status or the current status * is not set. */ private[core] final def update(status: TaskStatus, instant: Instant): Unit = if (status != this.status) { @@ -128,6 +148,7 @@ object Task { private[core] def status_=(status: TaskStatus): Instant = { val instant = Instant.now update(status, instant) + TaskInfo.loggers.foreach(_.record(this)) // notify! instant } diff --git a/core/src/test/scala/dagr/core/execsystem/TaskManagerTest.scala b/core/src/test/scala/dagr/core/execsystem/TaskManagerTest.scala index fee194b0..3693a446 100644 --- a/core/src/test/scala/dagr/core/execsystem/TaskManagerTest.scala +++ b/core/src/test/scala/dagr/core/execsystem/TaskManagerTest.scala @@ -56,10 +56,11 @@ class TaskManagerTest extends UnitSpec with OptionValues with LazyLogging with B override def beforeAll(): Unit = Logger.level = LogLevel.Fatal override def afterAll(): Unit = Logger.level = LogLevel.Info - def getDefaultTaskManager(sleepMilliseconds: Int = 10): TestTaskManager = new TaskManager( + def getDefaultTaskManager(sleepMilliseconds: Int = 10, failFast: Boolean = false): TestTaskManager = new TaskManager( taskManagerResources = SystemResources.infinite, scriptsDirectory = None, - sleepMilliseconds = sleepMilliseconds + sleepMilliseconds = sleepMilliseconds, + failFast = failFast ) with TestTaskManager private def runSchedulerOnce(taskManager: TestTaskManager, @@ -193,9 +194,9 @@ class TaskManagerTest extends UnitSpec with OptionValues with LazyLogging with B val longTask: UnitTask = new ShellCommand("sleep", "1000") withName "sleep 1000" val failedTask: UnitTask = new ShellCommand("exit", "1") withName "exit 1" - val taskManager: TestTaskManager = getDefaultTaskManager(sleepMilliseconds=1) + val taskManager: TestTaskManager = getDefaultTaskManager(sleepMilliseconds=1, failFast=true) taskManager.addTasks(longTask, failedTask) - taskManager.runToCompletion(failFast=true) + taskManager.runToCompletion() taskManager.taskStatusFor(failedTask).value should be(TaskStatus.FailedExecution) taskManager.graphNodeStateFor(failedTask).value should be(GraphNodeState.COMPLETED) taskManager.taskStatusFor(longTask).value should be(TaskStatus.Stopped) @@ -204,9 +205,9 @@ class TaskManagerTest extends UnitSpec with OptionValues with LazyLogging with B it should "not schedule and run tasks that have failed dependencies" in { val List(a, b, c) = List(0,1,0).map(c => new ShellCommand("exit", c.toString)) a ==> b ==> c - val tm = getDefaultTaskManager(sleepMilliseconds=1) + val tm = getDefaultTaskManager(sleepMilliseconds=1, failFast=false) tm.addTasks(a, b, c) - tm.runToCompletion(failFast=false) + tm.runToCompletion() tm.taskStatusFor(a).value shouldBe TaskStatus.SucceededExecution tm.graphNodeFor(a).value.state shouldBe GraphNodeState.COMPLETED tm.taskStatusFor(b).value shouldBe TaskStatus.FailedExecution @@ -218,9 +219,9 @@ class TaskManagerTest extends UnitSpec with OptionValues with LazyLogging with B it should "not schedule and run tasks that have failed dependencies and complete all when failed tasks are manually succeeded" in { val List(a, b, c) = List(0,1,0).map(c => new ShellCommand("exit", c.toString)) a ==> b ==> c - val tm = getDefaultTaskManager(sleepMilliseconds=1) + val tm = getDefaultTaskManager(sleepMilliseconds=1, failFast=false) tm.addTasks(a, b, c) - tm.runToCompletion(failFast=false) + tm.runToCompletion() tm.taskStatusFor(a).value shouldBe TaskStatus.SucceededExecution tm.graphNodeFor(a).value.state shouldBe GraphNodeState.COMPLETED tm.taskStatusFor(b).value shouldBe TaskStatus.FailedExecution @@ -230,7 +231,7 @@ class TaskManagerTest extends UnitSpec with OptionValues with LazyLogging with B // manually succeed b tm.taskExecutionInfoFor(b).foreach(_.status = TaskStatus.ManuallySucceeded) - tm.runToCompletion(failFast=false) + tm.runToCompletion() tm.taskStatusFor(a).value shouldBe TaskStatus.SucceededExecution tm.graphNodeFor(a).value.state shouldBe GraphNodeState.COMPLETED tm.taskStatusFor(b).value shouldBe TaskStatus.ManuallySucceeded @@ -257,7 +258,7 @@ class TaskManagerTest extends UnitSpec with OptionValues with LazyLogging with B originalGraphNodeState: GraphNodeState.Value, taskManager: TestTaskManager): Unit = { // run it once, make sure tasks that are failed are not marked as completed - taskManager.runToCompletion(failFast=true) + taskManager.runToCompletion() // the task identifier for 'original' should now be found val originalTaskId: TaskId = taskManager.taskFor(original).value @@ -880,11 +881,11 @@ class TaskManagerTest extends UnitSpec with OptionValues with LazyLogging with B } // add the tasks to the task manager - val taskManager: TestTaskManager = getDefaultTaskManager(sleepMilliseconds = 1) + val taskManager: TestTaskManager = getDefaultTaskManager(sleepMilliseconds = 1, failFast = true) taskManager.addTasks(tasks) // run the tasks - taskManager.runToCompletion(failFast=true) + taskManager.runToCompletion() // make sure all tasks have been completed tasks.foreach { task => @@ -914,11 +915,11 @@ class TaskManagerTest extends UnitSpec with OptionValues with LazyLogging with B } // add the tasks to the task manager - val taskManager: TestTaskManager = getDefaultTaskManager(sleepMilliseconds = 1) + val taskManager: TestTaskManager = getDefaultTaskManager(sleepMilliseconds = 1, failFast = true) taskManager.addTasks(pipeline) // run the tasks - taskManager.runToCompletion(failFast=true) + taskManager.runToCompletion() // get and check the info val pipelineInfo: TaskExecutionInfo = getAndTestTaskExecutionInfo(taskManager, pipeline) @@ -953,11 +954,11 @@ class TaskManagerTest extends UnitSpec with OptionValues with LazyLogging with B // NB: the execution is really: root ==> firstTask ==> secondTask // add the tasks to the task manager - val taskManager: TestTaskManager = getDefaultTaskManager(sleepMilliseconds = 1) + val taskManager: TestTaskManager = getDefaultTaskManager(sleepMilliseconds = 1, failFast=true) taskManager.addTasks(outerPipeline) // run the tasks - taskManager.runToCompletion(failFast=true) + taskManager.runToCompletion() // get and check the info val firstTaskInfo : TaskExecutionInfo = getAndTestTaskExecutionInfo(taskManager, firstTask) diff --git a/core/src/test/scala/dagr/core/execsystem2/GraphExecutorImplTest.scala b/core/src/test/scala/dagr/core/execsystem2/GraphExecutorImplTest.scala index 0e56d6a0..ed20747a 100644 --- a/core/src/test/scala/dagr/core/execsystem2/GraphExecutorImplTest.scala +++ b/core/src/test/scala/dagr/core/execsystem2/GraphExecutorImplTest.scala @@ -37,6 +37,7 @@ import org.scalatest.PrivateMethodTester import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatest.time.{Seconds, Span} import dagr.core.tasksystem.Task.{TaskInfo => RootTaskInfo} +import dagr.core.execsystem2.TaskStatus._ import scala.concurrent.{Await, Future} import scala.concurrent.duration.Duration diff --git a/core/src/test/scala/dagr/core/execsystem2/GraphExecutorTest.scala b/core/src/test/scala/dagr/core/execsystem2/GraphExecutorTest.scala index f2b1d3f1..5dab3c7c 100644 --- a/core/src/test/scala/dagr/core/execsystem2/GraphExecutorTest.scala +++ b/core/src/test/scala/dagr/core/execsystem2/GraphExecutorTest.scala @@ -34,6 +34,7 @@ import dagr.core.tasksystem.Retry import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatest.time.{Seconds, Span} import dagr.core.tasksystem.Task.{TaskInfo => RootTaskInfo} +import dagr.core.execsystem2.TaskStatus._ import scala.concurrent.Future import scala.concurrent.duration.Duration diff --git a/core/src/test/scala/dagr/core/execsystem2/GraphExecutorUnitSpec.scala b/core/src/test/scala/dagr/core/execsystem2/GraphExecutorUnitSpec.scala index 36914215..6df2e07f 100644 --- a/core/src/test/scala/dagr/core/execsystem2/GraphExecutorUnitSpec.scala +++ b/core/src/test/scala/dagr/core/execsystem2/GraphExecutorUnitSpec.scala @@ -34,6 +34,7 @@ import dagr.core.exec.ResourceSet import dagr.core.execsystem2.local.LocalTaskExecutor import dagr.core.tasksystem.Task.{TimePoint, TaskInfo => RootTaskInfo} import dagr.core.tasksystem.{Pipeline, ShellCommand, Task, UnitTask} +import dagr.core.execsystem2.TaskStatus._ import scala.concurrent.Future import scala.concurrent.duration.Duration diff --git a/core/src/test/scala/dagr/core/execsystem2/TaskInfoTest.scala b/core/src/test/scala/dagr/core/execsystem2/TaskInfoTest.scala index 0c074094..e79263cb 100644 --- a/core/src/test/scala/dagr/core/execsystem2/TaskInfoTest.scala +++ b/core/src/test/scala/dagr/core/execsystem2/TaskInfoTest.scala @@ -30,6 +30,7 @@ import java.time.Instant import dagr.core.UnitSpec import dagr.core.tasksystem.{NoOpInJvmTask, Task} import org.scalatest.OptionValues +import dagr.core.execsystem2.TaskStatus._ class TaskInfoTest extends UnitSpec with OptionValues { private def task: Task = new NoOpInJvmTask("name") diff --git a/core/src/test/scala/dagr/core/execsystem2/TaskLoggerTest.scala b/core/src/test/scala/dagr/core/execsystem2/TopLikeStatusReporterTest.scala similarity index 91% rename from core/src/test/scala/dagr/core/execsystem2/TaskLoggerTest.scala rename to core/src/test/scala/dagr/core/execsystem2/TopLikeStatusReporterTest.scala index d9d5a58f..40c922ec 100644 --- a/core/src/test/scala/dagr/core/execsystem2/TaskLoggerTest.scala +++ b/core/src/test/scala/dagr/core/execsystem2/TopLikeStatusReporterTest.scala @@ -31,10 +31,12 @@ import com.fulcrumgenomics.commons.util.Logger import dagr.core.FutureUnitSpec import dagr.core.execsystem2.local.LocalTaskExecutor import dagr.core.tasksystem.NoOpInJvmTask +import dagr.core.execsystem2.TaskStatus._ +import dagr.core.tasksystem.Task -class TaskLoggerTest extends FutureUnitSpec { +class TopLikeStatusReporterTest extends FutureUnitSpec { - private def toLoggerOutputStream(): ByteArrayOutputStream = { + private def toLoggerOutputStream: ByteArrayOutputStream = { val loggerOutputStream = new ByteArrayOutputStream() val loggerPrintStream = new PrintStream(loggerOutputStream) Logger.out = loggerPrintStream @@ -46,10 +48,12 @@ class TaskLoggerTest extends FutureUnitSpec { val taskExecutor = new LocalTaskExecutor() val reporter = new TopLikeStatusReporter( systemResources = taskExecutor.resources, - loggerOut = Some(toLoggerOutputStream()), - print = s => System.out.print(s) + loggerOut = Some(toLoggerOutputStream), + print = s => Unit ) + Task.TaskInfo.withLogger(reporter) + new TaskInfo(task, Pending) // running diff --git a/core/src/test/scala/dagr/core/execsystem2/local/LocalTaskExecutorTest.scala b/core/src/test/scala/dagr/core/execsystem2/local/LocalTaskExecutorTest.scala index 08b12477..f723ab89 100644 --- a/core/src/test/scala/dagr/core/execsystem2/local/LocalTaskExecutorTest.scala +++ b/core/src/test/scala/dagr/core/execsystem2/local/LocalTaskExecutorTest.scala @@ -34,6 +34,7 @@ import dagr.core.execsystem2._ import dagr.core.tasksystem._ import com.fulcrumgenomics.commons.CommonsDef.yieldAndThen import dagr.core.exec.{Cores, Memory, Resource, ResourceSet} +import dagr.core.execsystem2.TaskStatus._ import scala.concurrent.{CancellationException, Future} import scala.concurrent.duration.Duration diff --git a/core/src/test/scala/dagr/core/execsystem2/local/LocalTaskRunnerTest.scala b/core/src/test/scala/dagr/core/execsystem2/local/LocalTaskRunnerTest.scala index 8367068f..8179e7cd 100644 --- a/core/src/test/scala/dagr/core/execsystem2/local/LocalTaskRunnerTest.scala +++ b/core/src/test/scala/dagr/core/execsystem2/local/LocalTaskRunnerTest.scala @@ -30,7 +30,8 @@ import java.nio.file.{Files, Path} import com.fulcrumgenomics.commons.CommonsDef.FilePath import dagr.core.FutureUnitSpec import dagr.core.exec.ResourceSet -import dagr.core.execsystem2.{Running, TaskInfo} +import dagr.core.execsystem2.TaskInfo +import dagr.core.execsystem2.TaskStatus.Running import dagr.core.tasksystem._ import scala.concurrent.{Await, CancellationException, TimeoutException} diff --git a/core/src/test/scala/dagr/core/reporting/TopLikeStatusReporterTest.scala b/core/src/test/scala/dagr/core/reporting/TopLikeStatusReporterTest.scala index c0eba10e..a6f1707a 100644 --- a/core/src/test/scala/dagr/core/reporting/TopLikeStatusReporterTest.scala +++ b/core/src/test/scala/dagr/core/reporting/TopLikeStatusReporterTest.scala @@ -25,10 +25,14 @@ package dagr.core.reporting +import java.io.{ByteArrayOutputStream, PrintStream} + +import com.fulcrumgenomics.commons.CommonsDef.yieldAndThen import com.fulcrumgenomics.commons.util.{CaptureSystemStreams, LogLevel, Logger} import dagr.core.UnitSpec -import dagr.core.execsystem.{GraphNodeState, SystemResources, TaskManager, TaskManagerReporter} +import dagr.core.execsystem.{GraphNodeState, SystemResources, TaskManager} import dagr.core.reporting.Terminal.Dimensions +import dagr.core.tasksystem.Task.TaskInfo import dagr.core.tasksystem.{NoOpInJvmTask, SimpleInJvmTask} import org.scalatest.BeforeAndAfterAll @@ -62,41 +66,71 @@ class TopLikeStatusReporterTest extends UnitSpec with CaptureSystemStreams with private def getDefaultTaskManager(sleepMilliseconds: Int = 10): TaskManager = new TaskManager( taskManagerResources = SystemResources.infinite, scriptsDirectory = None, - sleepMilliseconds = sleepMilliseconds + sleepMilliseconds = sleepMilliseconds, + failFast = true ) + private def getDefaultReporter(taskManager: TaskManager, print: String => Unit) = { + val loggerOutputStream: ByteArrayOutputStream = { + val outStream = new ByteArrayOutputStream() + val printStream = new PrintStream(outStream) + Logger.out = printStream + outStream + } + + val reporter = new dagr.core.execsystem.TopLikeStatusReporter( + taskManager = taskManager, + loggerOut = Some(loggerOutputStream), + print = print + ) with TestTerminal + + yieldAndThen(reporter)(TaskInfo.withLogger(reporter)) + } + + private def getTwoLineReporter(taskManager: TaskManager, print: String => Unit) = { + val loggerOutputStream: ByteArrayOutputStream = { + val outStream = new ByteArrayOutputStream() + val printStream = new PrintStream(outStream) + Logger.out = printStream + outStream + } + + val reporter = new dagr.core.execsystem.TopLikeStatusReporter( + taskManager = taskManager, + loggerOut = Some(loggerOutputStream), + print = print + ) with TwoLineTestTerminal + + yieldAndThen(reporter)(TaskInfo.withLogger(reporter)) + } + "Terminal" should "support ANSI codes" in { Terminal.supportsAnsi shouldBe true } - "TopLikeStatusReporter" should "start and shutdown cleanly when no tasks are being managed" in { + "TopLikeStatusReporter" should "not print tasks when no tasks are being managed" in { val taskManager = getDefaultTaskManager() val output = new StringBuilder() - val reporter = new PeriodicRefreshingReporter(reporter = new TaskManagerReporter(taskManager=taskManager) with TestTerminal, print = (str: String) => output.append(str)) - reporter.start() - reporter.shutdown() + getDefaultReporter(taskManager, print = (str: String) => output.append(str)) output should not be 'empty + output.split('\n').length shouldBe 5 } - it should "start and shutdown cleanly when no tasks are being managed with a two line terminal" in { + it should "not print tasks when no tasks are being managed with a two line terminal" in { val taskManager = getDefaultTaskManager() val output = new StringBuilder() - val reporter = new PeriodicRefreshingReporter(reporter = new TaskManagerReporter(taskManager=taskManager) with TwoLineTestTerminal, print = (str: String) => output.append(str)) - reporter.start() - reporter.shutdown() + getTwoLineReporter(taskManager, print = (str: String) => output.append(str)) output should not be 'empty output.split('\n').length shouldBe 2 output.toString() should include("with 4 more lines not shown") } - it should "start and shutdown cleanly when a single task is being managed" in { + it should "print the output when a single task is being managed" in { val taskManager = getDefaultTaskManager() val output = new StringBuilder() - val reporter = new PeriodicRefreshingReporter(reporter = new TaskManagerReporter(taskManager=taskManager) with TestTerminal, print = (str: String) => output.append(str)) - reporter.start() + getDefaultReporter(taskManager, print = (str: String) => output.append(str)) taskManager.addTask(new NoOpInJvmTask("Exit0Task")) - taskManager.runToCompletion(failFast=true) - reporter.shutdown() + taskManager.runToCompletion() output should not be 'empty output.toString() should include("Exit0Task") output.toString() should include("1 Done") @@ -110,11 +144,9 @@ class TopLikeStatusReporterTest extends UnitSpec with CaptureSystemStreams with val output = new StringBuilder() val printMethod: String => Unit = (str: String) => output.append(str) val taskManager = getDefaultTaskManager() - val reporter = new PeriodicRefreshingReporter(reporter = new TaskManagerReporter(taskManager=taskManager) with TestTerminal, print = printMethod) - + getDefaultReporter(taskManager, print=printMethod) taskManager.addTask(taskOne) - taskManager.runToCompletion(failFast=true) - reporter.refresh() + taskManager.runToCompletion() output should not be 'empty output.toString() should include("TaskOne") output.toString() should include("0 Running") @@ -128,7 +160,7 @@ class TopLikeStatusReporterTest extends UnitSpec with CaptureSystemStreams with val output = new StringBuilder() val printMethod: String => Unit = (str: String) => output.append(str) val taskManager = getDefaultTaskManager() - val reporter = new PeriodicRefreshingReporter(reporter = new TaskManagerReporter(taskManager=taskManager) with TestTerminal, print = printMethod) + getDefaultReporter(taskManager, print=printMethod) taskManager.addTask(taskOne) taskManager.stepExecution() @@ -136,15 +168,13 @@ class TopLikeStatusReporterTest extends UnitSpec with CaptureSystemStreams with Thread.sleep(10) taskManager.stepExecution() } - reporter.refresh() output should not be 'empty output.toString() should include("TaskOne") output.toString() should include("1 Running") output.toString() should include("0 Done") taskOne.completeTask = true - taskManager.runToCompletion(failFast=true) - reporter.refresh() + taskManager.runToCompletion() output should not be 'empty output.toString() should include("TaskOne") output.toString() should include("0 Running") @@ -162,7 +192,7 @@ class TopLikeStatusReporterTest extends UnitSpec with CaptureSystemStreams with scriptsDirectory = None, sleepMilliseconds = 10 ) - val reporter = new PeriodicRefreshingReporter(reporter = new TaskManagerReporter(taskManager=taskManager) with TestTerminal, print = printMethod) + getDefaultReporter(taskManager, print=printMethod) // add the first task, which can be scheduled. taskManager.addTasks(taskOne, taskTwo) @@ -171,7 +201,6 @@ class TopLikeStatusReporterTest extends UnitSpec with CaptureSystemStreams with Thread.sleep(10) taskManager.stepExecution() } - reporter.refresh() output should not be 'empty output.toString() should include("TaskOne") output.toString() should include("TaskTwo") @@ -181,8 +210,7 @@ class TopLikeStatusReporterTest extends UnitSpec with CaptureSystemStreams with taskOne.completeTask = true taskTwo.completeTask = true - taskManager.runToCompletion(failFast=true) - reporter.refresh() + taskManager.runToCompletion() output should not be 'empty output.toString() should include("TaskOne") output.toString() should include("TaskTwo") @@ -198,7 +226,7 @@ class TopLikeStatusReporterTest extends UnitSpec with CaptureSystemStreams with val output = new StringBuilder() val printMethod: String => Unit = (str: String) => output.append(str) val taskManager = getDefaultTaskManager() - val reporter = new PeriodicRefreshingReporter(reporter = new TaskManagerReporter(taskManager=taskManager) with TestTerminal, print = printMethod) + getDefaultReporter(taskManager, print=printMethod) // add the first task, which can be scheduled. taskManager.addTasks(taskOne, taskTwo) @@ -207,7 +235,6 @@ class TopLikeStatusReporterTest extends UnitSpec with CaptureSystemStreams with Thread.sleep(10) taskManager.stepExecution() } - reporter.refresh() output should not be 'empty output.toString() should include("TaskOne") output.toString() should include("TaskTwo") @@ -217,8 +244,7 @@ class TopLikeStatusReporterTest extends UnitSpec with CaptureSystemStreams with taskOne.completeTask = true taskTwo.completeTask = true - taskManager.runToCompletion(failFast=true) - reporter.refresh() + taskManager.runToCompletion() output should not be 'empty output.toString() should include("TaskOne") output.toString() should include("TaskTwo") diff --git a/tasks/src/test/scala/dagr/tasks/ScatterGatherTests.scala b/tasks/src/test/scala/dagr/tasks/ScatterGatherTests.scala index 222d4c1e..e61b319e 100644 --- a/tasks/src/test/scala/dagr/tasks/ScatterGatherTests.scala +++ b/tasks/src/test/scala/dagr/tasks/ScatterGatherTests.scala @@ -46,7 +46,7 @@ import org.scalatest.BeforeAndAfterAll class ScatterGatherTests extends UnitSpec with LazyLogging with BeforeAndAfterAll { override def beforeAll(): Unit = Logger.level = LogLevel.Fatal override def afterAll(): Unit = Logger.level = LogLevel.Info - def buildTaskManager: TaskManager = new TaskManager(taskManagerResources = SystemResources.infinite, scriptsDirectory = None, sleepMilliseconds=1) + def buildTaskManager: TaskManager = new TaskManager(taskManagerResources = SystemResources.infinite, scriptsDirectory = None, sleepMilliseconds=1, failFast=true) def tmp(): Path = { val path = Files.createTempFile("testScatterGather.", ".txt") @@ -90,7 +90,7 @@ class ScatterGatherTests extends UnitSpec with LazyLogging with BeforeAndAfterAl val pipeline = new Pipeline() { override def build(): Unit = { - val scatter = Scatter(new SplitByLine(input=input)) + val scatter = Scatter(SplitByLine(input=input)) val counts = scatter.map(p => CountWords(input=p, output=tmp())) counts.gather(cs => SumNumbers(inputs=cs.map(_.output), output=sumOfCounts)) @@ -103,7 +103,7 @@ class ScatterGatherTests extends UnitSpec with LazyLogging with BeforeAndAfterAl val taskManager = buildTaskManager taskManager.addTask(pipeline) - taskManager.runToCompletion(true) + taskManager.runToCompletion() val sum1 = Io.readLines(sumOfCounts).next().toInt val sum2 = Io.readLines(sumOfSquares).next().toInt