Skip to content
This repository has been archived by the owner on Jun 20, 2024. It is now read-only.

Commit

Permalink
Major refactor again.
Browse files Browse the repository at this point in the history
Starting support for replay in the main task system (shouldn't be too much more effort)

Test should be failing at the moment!
  • Loading branch information
nh13 committed Jun 10, 2017
1 parent 6e35f52 commit db43946
Show file tree
Hide file tree
Showing 28 changed files with 372 additions and 300 deletions.
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ lazy val core = Project(id="dagr-core", base=file("core"))
"com.fulcrumgenomics" %% "commons" % "0.2.0-SNAPSHOT",
"com.fulcrumgenomics" %% "sopt" % "0.2.0-SNAPSHOT",
"com.github.dblock" % "oshi-core" % "3.3",
"com.beachape" %% "enumeratum" % "1.5.12",
"org.scala-lang" % "scala-reflect" % scalaVersion.value,
"org.scala-lang" % "scala-compiler" % scalaVersion.value,
"org.reflections" % "reflections" % "0.9.10",
Expand Down
94 changes: 31 additions & 63 deletions core/src/main/scala/dagr/core/cmdline/DagrCoreMain.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
*/
package dagr.core.cmdline

import java.io.{ByteArrayOutputStream, PrintStream, PrintWriter}
import java.io.PrintWriter
import java.net.InetAddress
import java.nio.file.{Files, Path}
import java.text.DecimalFormat
Expand All @@ -36,14 +36,13 @@ import com.fulcrumgenomics.sopt.parsing.{ArgOptionAndValues, ArgTokenCollator, A
import com.fulcrumgenomics.sopt.util.TermCode
import com.fulcrumgenomics.sopt.{Sopt, arg}
import dagr.core.config.Configuration
import dagr.core.exec.{Cores, Memory}
import dagr.core.exec.{Cores, Executor, Memory, TaskCache}
import dagr.core.execsystem._
import dagr.core.execsystem2.GraphExecutor
import dagr.core.execsystem2.{TopLikeStatusReporter => TopLikeStatusReporter2}
import dagr.core.execsystem2.local.LocalTaskExecutor
import dagr.core.execsystem2.replay.TaskCache
import dagr.core.reporting.{ExecutionLogger, FinalStatusReporter, PeriodicRefreshingReporter, Terminal}
import dagr.core.reporting.{ExecutionLogger, Terminal, TopLikeStatusReporter}
import dagr.core.tasksystem.Pipeline
import dagr.core.tasksystem.Task.TaskInfo

import scala.collection.mutable.ListBuffer
import scala.concurrent.ExecutionContext
Expand Down Expand Up @@ -119,8 +118,7 @@ class DagrCoreArgs(
) extends LazyLogging {

// These are not optional, but are only populated during configure()
private var taskManager : Option[TaskManager] = None
private var taskExecutor: Option[LocalTaskExecutor] = None
private var executor : Option[Executor] = None
private var reportPath : Option[Path] = None

// Initialize the configuration as early as possible
Expand Down Expand Up @@ -163,10 +161,11 @@ class DagrCoreArgs(

val resources = SystemResources(cores = cores.map(Cores(_)), totalMemory = memory.map(Memory(_)))
if (experimentalExecution) {
this.taskExecutor = Some(new LocalTaskExecutor(systemResources = resources, scriptsDirectory = scriptsDirectory, logDirectory = logDirectory))
val taskExecutor = new LocalTaskExecutor(systemResources = resources, scriptsDirectory = scriptsDirectory, logDirectory = logDirectory)
this.executor = Some( GraphExecutor(taskExecutor))
}
else {
this.taskManager = Some(new TaskManager(taskManagerResources = resources, scriptsDirectory = scriptsDirectory, logDirectory = logDirectory))
this.executor = Some(new TaskManager(taskManagerResources = resources, scriptsDirectory = scriptsDirectory, logDirectory = logDirectory))
}

// Print all the arguments if desired.
Expand All @@ -188,72 +187,41 @@ class DagrCoreArgs(
*/
protected[cmdline] def execute(pipeline : Pipeline)(implicit ex: ExecutionContext): Int = {
val report = this.reportPath.getOrElse(throw new IllegalStateException("execute() called before configure()"))
// FIXME: should this path be exposed on the command line?
val executionLog = {
if (report == Io.StdOut.toAbsolutePath) None // FIXME
else Some(report.getParent.resolve("execution_log.txt"))
}

if (interactive && !Terminal.supportsAnsi) {
logger.warning("ANSI codes are not supported in your terminal. Interactive mode will not be used.")
interactive = false
None
}

def toLoggerOutputStream(): ByteArrayOutputStream = {
val loggerOutputStream = new ByteArrayOutputStream()
val loggerPrintStream = new PrintStream(loggerOutputStream)
Logger.out = loggerPrintStream
loggerOutputStream
}
// Get the executor
val executor = this.executor.getOrElse(throw new IllegalStateException("Executor was not configured, did you all configure()"))

val (finalStatusReporter: FinalStatusReporter, exitCode: Int) = if (experimentalExecution) {
val taskExecutor = this.taskExecutor.getOrElse(throw new IllegalStateException("execute() called before configure()"))
val graphExecutor = GraphExecutor(taskExecutor)
if (interactive) {
val reporter = new TopLikeStatusReporter2(
systemResources = taskExecutor.resources,
loggerOut = Some(toLoggerOutputStream()),
print = s => System.out.print(s)
)
graphExecutor.withLogger(reporter)
// Set up an interactive logger if desired and supported
if (this.interactive) {
if (Terminal.supportsAnsi) {
TaskInfo.withLogger(TopLikeStatusReporter(executor))
}
executionLog.foreach { log =>
val executionLogger = new ExecutionLogger(log)
graphExecutor.withLogger(executionLogger)
graphExecutor.withTaskRegister(executionLogger)
}
this.replayLog.foreach { log =>
val taskCache = TaskCache(log)
graphExecutor.withTaskCache(taskCache)
else {
logger.warning("ANSI codes are not supported in your terminal. Interactive mode will not be used.")
}
(graphExecutor, graphExecutor execute pipeline)
}
else {
val taskMan = this.taskManager.getOrElse(throw new IllegalStateException("execute() called before configure()"))

val interactiveReporter: Option[PeriodicRefreshingReporter] = if (!interactive) { None } else {
val reporter = new TaskManagerReporter(taskManager=taskMan)
Some(new PeriodicRefreshingReporter(reporter=reporter, loggerOut=Some(toLoggerOutputStream()), print = s => System.out.print(s)))
}
interactiveReporter.foreach(_.start())

taskMan.addTask(pipeline)
taskMan.runToCompletion(this.failFast)

interactiveReporter.foreach(_.shutdown())
// Set up the execution logger
// FIXME: should this path be exposed on the command line?
if (!Seq(Io.StdOut, Io.DevNull).map(_.toAbsolutePath).contains(report)) {
val log = report.getParent.resolve("execution_log.txt")
val executionLogger = new ExecutionLogger(log)
TaskInfo.withLogger(executionLogger)
executor.withTaskRegister(executionLogger)
}

// return an exit code based on the number of non-completed tasks
val code = taskMan.taskToInfoBiMapFor.count { case (_, info) =>
TaskStatus.notDone(info.status, failedIsDone=false)
}
(taskMan, code)
// Set up the task cache (in case of replay)
this.replayLog.foreach { log =>
executor.withTaskCache(TaskCache(log))
}

// execute
val exitCode = executor.execute(pipeline)

// Write out the execution report
if (!interactive || Io.StdOut != report) {
val pw = new PrintWriter(Io.toWriter(report))
finalStatusReporter.logReport({ str: String => pw.write(str + "\n") })
executor.logReport({ str: String => pw.write(str + "\n") })
pw.close()
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* The MIT License
*
* Copyright (c) 2017 Fulcrum Genomics LLC
* Copyright (c) 2017 Fulcrum GenomicsLLC
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand All @@ -23,11 +23,11 @@
*
*/

package dagr.core.execsystem2
package dagr.core.exec

import scala.collection.mutable

private[execsystem2] object ExecDef {
object ExecDef {
/** Create a thread-safe mutable map. */
def concurrentMap[A,B](): mutable.Map[A,B] = {
import scala.collection.convert.decorateAsScala._
Expand Down
57 changes: 57 additions & 0 deletions core/src/main/scala/dagr/core/exec/Executor.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* The MIT License
*
* Copyright (c) 2017 Fulcrum Genomics LLC
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*
*/

package dagr.core.exec

import dagr.core.execsystem2.TaskStatusLogger
import dagr.core.reporting.FinalStatusReporter
import dagr.core.reporting.ReportingDef.{TaskLogger, TaskRegister}
import dagr.core.tasksystem.Task
import com.fulcrumgenomics.commons.CommonsDef.yieldAndThen

import scala.collection.mutable.ListBuffer

/** All executors of tasks should extend this trait. */
trait Executor extends FinalStatusReporter {

/** A list of [[TaskRegister]]s that will be notified when a list of tasks is returned by [[Task.getTasks]]. */
protected val registers: ListBuffer[TaskRegister] = ListBuffer[TaskRegister]()

/** A list of [[TaskCache]] to use to determine if a task should be manually succeeded. */
protected val taskCaches: ListBuffer[TaskCache] = ListBuffer[TaskCache]()

/** Adds the [[TaskRegister]] to the list of registers to be notified when a list of tasks is returned by [[Task.getTasks]] */
def withTaskRegister(register: TaskRegister): this.type = yieldAndThen[this.type](this)(this.registers.append(register))

/** Adds the [[TaskCache]] to the list of caches to use to determine if a task should be manually succeeded. */
def withTaskCache(taskCache: TaskCache): this.type = yieldAndThen[this.type](this) {
withTaskRegister(taskCache)
this.taskCaches.append(taskCache)
}

/** Start the execution of this task and all tasks that depend on it. Returns the number of tasks that were not
* executed. A given task should only be attempted once. */
def execute(task: Task): Int
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,11 @@
*
*/

package dagr.core.execsystem2.replay
package dagr.core.exec

import com.fulcrumgenomics.commons.CommonsDef.FilePath
import com.fulcrumgenomics.commons.io.Io
import com.fulcrumgenomics.commons.util.LazyLogging
import dagr.core.execsystem2.{ManuallySucceeded, SucceededExecution}
import dagr.core.reporting.ExecutionLogger.{Definition, Relationship, Status}
import dagr.core.reporting.ReportingDef.TaskRegister
import dagr.core.tasksystem.Task
Expand Down Expand Up @@ -159,12 +158,13 @@ class SimpleTaskCache(replayLog: FilePath) extends TaskCache with LazyLogging {

/** Updates tasksToExecute if the task should be executed. */
private def maybeSetTaskToExecute(task: Task, taskReplayDefinition: Definition): Unit = {
val status = this.statuses.filter { status => status.definitionCode == taskReplayDefinition.code }
val succeededExecution = this.statuses.filter { status => status.definitionCode == taskReplayDefinition.code }
.sortBy(-_.statusOrdinal)
.map(status => task.taskInfo.status.from(status.statusOrdinal).success)
.headOption
.getOrElse(Status(taskReplayDefinition.code, "", -1)) // the OrElse occurs if no status was set
// NB: this is specific to execsystem2
if (status.statusOrdinal != SucceededExecution.ordinal && status.statusOrdinal != ManuallySucceeded.ordinal) {
.getOrElse(false)

if (succeededExecution) {
// execute it
logger.debug(s"Adding task ${task.name} to execute")
this.tasksToExecute += task
Expand Down
27 changes: 20 additions & 7 deletions core/src/main/scala/dagr/core/execsystem/TaskManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import com.fulcrumgenomics.commons.io.{Io, PathUtil}
import com.fulcrumgenomics.commons.util.LazyLogging
import dagr.core.DagrDef._
import dagr.core.exec._
import dagr.core.reporting.FinalStatusReporter
import dagr.core.tasksystem._

/** The resources needed for the task manager */
Expand Down Expand Up @@ -110,11 +109,12 @@ object TaskManager extends LazyLogging {
logDirectory = logDirectory,
scheduler = scheduler.getOrElse(defaultScheduler),
simulate = simulate,
sleepMilliseconds = sleepMilliseconds
sleepMilliseconds = sleepMilliseconds,
failFast = failFast
)

taskManager.addTask(task = task)
taskManager.runToCompletion(failFast=failFast)
taskManager.runToCompletion()

taskManager.taskToInfoBiMapFor
}
Expand All @@ -136,8 +136,9 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de
logDirectory: Option[Path] = None,
scheduler: Scheduler = TaskManagerDefaults.defaultScheduler,
simulate: Boolean = false,
sleepMilliseconds: Int = 1000
) extends TaskManagerLike with TaskTracker with FinalStatusReporter with LazyLogging {
sleepMilliseconds: Int = 1000,
failFast: Boolean = false
) extends TaskManagerLike with TaskTracker with Executor with LazyLogging {

private val actualScriptsDirectory = scriptsDirectory getOrElse Io.makeTempDir("scripts")
protected val actualLogsDirectory = logDirectory getOrElse Io.makeTempDir("logs")
Expand Down Expand Up @@ -549,7 +550,7 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de
)
}

override def runToCompletion(failFast: Boolean): BiMap[Task, TaskExecutionInfo] = {
override def runToCompletion(): BiMap[Task, TaskExecutionInfo] = {
var allDone = false
while (!allDone) {
val (readyTasks, tasksToSchedule, runningTasks, _) = stepExecution()
Expand Down Expand Up @@ -582,7 +583,7 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de

allDone = true
}
else if (failFast && hasFailedTasks) {
else if (this.failFast && hasFailedTasks) {
allDone = true
}
}
Expand All @@ -609,4 +610,16 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de

taskToInfoBiMapFor
}

override def execute(task: Task): Int = {
this.addTask(task)
this.runToCompletion()
this.taskToInfoBiMapFor.count { case (_, info) =>
TaskStatus.notDone(info.status, failedIsDone=false)
}
}

// TODO:
// support TaskRegister
// support TaskCache
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ private[execsystem] trait TaskManagerLike {
*
* @return a bi-directional map from the set of tasks to their execution information.
*/
def runToCompletion(failFast: Boolean): BiMap[Task, TaskExecutionInfo]
def runToCompletion(): BiMap[Task, TaskExecutionInfo]

/** Run a a single iteration of managing tasks.
*
Expand Down
33 changes: 21 additions & 12 deletions core/src/main/scala/dagr/core/execsystem/TaskStatus.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,22 @@
*/
package dagr.core.execsystem

import dagr.core.execsystem.TaskStatus.{ManuallySucceeded, SucceededExecution}
import dagr.core.tasksystem.Task
import dagr.core.tasksystem.Task.{TaskStatus => RootTaskStatus}
import enumeratum.values.{IntEnum, IntEnumEntry}

sealed trait TaskStatus extends RootTaskStatus {
override def toString: String = description
sealed abstract class TaskStatus extends IntEnumEntry with Task.TaskStatus {
override def ordinal = this.value
/** Returns true if this status indicates any type of success, false otherwise. */
def success: Boolean = this.ordinal == SucceededExecution.ordinal || this.ordinal == ManuallySucceeded.ordinal
/** Returns the task status by ordinal */
def from(ordinal: Int): TaskStatus = TaskStatus.withValue(ordinal)
}

object TaskStatus {
case object TaskStatus extends IntEnum[TaskStatus] {
val values = findValues

/** Checks if a task with a given status is done.
*
* @param taskStatus the status of the task
Expand Down Expand Up @@ -64,17 +73,17 @@ object TaskStatus {
sealed trait Succeeded extends Completed

// High-level statuses
case object Unknown extends TaskStatus { val description: String = "is unknown"; val ordinal: Int = 0 }
case object Started extends TaskStatus { val description: String = "has been started"; val ordinal: Int = 1 }
case object Stopped extends Completed { val description: String = "has been stopped"; val ordinal: Int = 2 }
case object Unknown extends TaskStatus { val description: String = "is unknown"; val value: Int = 0 }
case object Started extends TaskStatus { val description: String = "has been started"; val value: Int = 1 }
case object Stopped extends Completed { val description: String = "has been stopped"; val value: Int = 2 }

// Statuses after execution has completed
case object FailedGetTasks extends Failed { val description: String = "has failed (could not get the list of tasks)"; val ordinal: Int = 3 }
case object FailedScheduling extends Failed { val description: String = "has failed (could not start executing after scheduling)"; val ordinal: Int = 4 }
case object FailedExecution extends Failed { val description: String = "has failed (execution)"; val ordinal: Int = 5 }
case object FailedOnComplete extends Failed { val description: String = "Failed during the onComplete callback"; val ordinal: Int = 6 }
case object SucceededExecution extends Succeeded { val description: String = "has succeeded"; val ordinal: Int = 7 }
case object ManuallySucceeded extends Succeeded { val description: String = "has succeeded (manually)"; val ordinal: Int = 8 }
case object FailedGetTasks extends Failed { val description: String = "has failed (could not get the list of tasks)"; val value: Int = 3 }
case object FailedScheduling extends Failed { val description: String = "has failed (could not start executing after scheduling)"; val value: Int = 4 }
case object FailedExecution extends Failed { val description: String = "has failed (execution)"; val value: Int = 5 }
case object FailedOnComplete extends Failed { val description: String = "Failed during the onComplete callback"; val value: Int = 6 }
case object SucceededExecution extends Succeeded { val description: String = "has succeeded"; val value: Int = 7 }
case object ManuallySucceeded extends Succeeded { val description: String = "has succeeded (manually)"; val value: Int = 8 }

val TaskStatuses = Seq(Unknown, Started, Stopped, FailedGetTasks, FailedScheduling, FailedExecution, FailedOnComplete, SucceededExecution, ManuallySucceeded)
}
Loading

0 comments on commit db43946

Please sign in to comment.