Skip to content
This repository has been archived by the owner on Jun 20, 2024. It is now read-only.

Commit

Permalink
Support replay and a major refactor.
Browse files Browse the repository at this point in the history
Refactor task and exec system prior to new execution system.
  • Loading branch information
nh13 committed Jun 11, 2017
1 parent 4979ccb commit 78cb922
Show file tree
Hide file tree
Showing 82 changed files with 1,960 additions and 801 deletions.
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ lazy val commonSettings = Seq(
testOptions in Test += Tests.Argument(TestFrameworks.ScalaTest, "-h", Option(System.getenv("TEST_HTML_REPORTS")).getOrElse(htmlReportsDirectory)),
testOptions in Test += Tests.Argument("-l", "LongRunningTest"), // ignores long running tests
// uncomment for full stack traces
//testOptions in Test += Tests.Argument("-oDF"),
testOptions in Test += Tests.Argument("-oDF"),
fork in Test := true,
resolvers += Resolver.jcenterRepo,
resolvers += Resolver.sonatypeRepo("public"),
Expand Down Expand Up @@ -123,6 +123,7 @@ lazy val core = Project(id="dagr-core", base=file("core"))
"com.fulcrumgenomics" %% "commons" % "0.2.0-SNAPSHOT",
"com.fulcrumgenomics" %% "sopt" % "0.2.0-SNAPSHOT",
"com.github.dblock" % "oshi-core" % "3.3",
"com.beachape" %% "enumeratum" % "1.5.12",
"org.scala-lang" % "scala-reflect" % scalaVersion.value,
"org.scala-lang" % "scala-compiler" % scalaVersion.value,
"org.reflections" % "reflections" % "0.9.10",
Expand Down
107 changes: 67 additions & 40 deletions core/src/main/scala/dagr/core/cmdline/DagrCoreMain.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,27 @@
*/
package dagr.core.cmdline

import java.io.{ByteArrayOutputStream, PrintStream, PrintWriter}
import java.io.PrintWriter
import java.net.InetAddress
import java.nio.file.{Files, Path}
import java.text.DecimalFormat

import com.fulcrumgenomics.commons.CommonsDef.unreachable
import com.fulcrumgenomics.commons.CommonsDef.{FilePath, unreachable}
import com.fulcrumgenomics.commons.io.{Io, PathUtil}
import com.fulcrumgenomics.commons.util.{LazyLogging, LogLevel, Logger}
import com.fulcrumgenomics.sopt.Sopt.CommandSuccess
import com.fulcrumgenomics.sopt.cmdline.{CommandLineParser, CommandLineProgramParserStrings, ValidationException}
import com.fulcrumgenomics.sopt.parsing.{ArgOptionAndValues, ArgTokenCollator, ArgTokenizer, OptionParser}
import com.fulcrumgenomics.sopt.cmdline.{CommandLineProgramParserStrings, ValidationException}
import com.fulcrumgenomics.sopt.parsing.{ArgOptionAndValues, ArgTokenCollator, ArgTokenizer}
import com.fulcrumgenomics.sopt.util.TermCode
import com.fulcrumgenomics.sopt.{OptionName, Sopt, arg}
import com.fulcrumgenomics.sopt.{Sopt, arg}
import dagr.core.config.Configuration
import dagr.core.exec.{Cores, Executor, Memory, TaskCache}
import dagr.core.execsystem._
import dagr.core.reporting.{ExecutionLogger, Terminal, TopLikeStatusReporter}
import dagr.core.tasksystem.Pipeline

import scala.collection.mutable.ListBuffer
import scala.util.{Failure, Success}
import scala.concurrent.ExecutionContext
import scala.util.Success

object DagrCoreMain extends Configuration {
/** The packages we wish to include in our command line **/
Expand All @@ -52,7 +54,9 @@ object DagrCoreMain extends Configuration {

/** The main method */
/** The main method */
def main(args: Array[String]): Unit = new DagrCoreMain().makeItSoAndExit(args)
def main(args: Array[String]): Unit = {
new DagrCoreMain().makeItSoAndExit(args)
}

/** Provide a command line validation error message */
private[cmdline] def buildErrorMessage(msgOption: Option[String] = None, exceptionOption: Option[Exception] = None): String = {
Expand Down Expand Up @@ -103,11 +107,13 @@ class DagrCoreArgs(
@arg(doc = "Write an execution report to this file, otherwise write to the stdout")
val report: Option[Path] = None,
@arg(doc = "Provide an top-like interface for tasks with the give delay in seconds. This suppress info logging.")
var interactive: Boolean = false
var interactive: Boolean = false,
@arg(doc = "Attempt to replay using the provided replay log")
val replayLog: Option[FilePath] = None
) extends LazyLogging {

// These are not optional, but are only populated during configure()
private var taskManager : Option[TaskManager] = None
private var executor : Option[Executor] = None
private var reportPath : Option[Path] = None

// Initialize the configuration as early as possible
Expand All @@ -121,11 +127,11 @@ class DagrCoreArgs(
/** Try to create a given directory, and if there is an exception, write the path since some exceptions can be obtuse */
private def mkdir(dir: Path, use: String, errors: ListBuffer[String]): Unit = {
try { Files.createDirectories(dir) }
catch { case e: Exception => errors += DagrCoreMain.buildErrorMessage(Some(s"Could not create the $use directory: $dir")) }
catch { case _: Exception => errors += DagrCoreMain.buildErrorMessage(Some(s"Could not create the $use directory: $dir")) }
}

// Invoked by DagrCommandLineParser after the pipeline has also been instantiated
private[cmdline] def configure(pipeline: Pipeline, commandLine: Option[String] = None) : Unit = {
private[cmdline] def configure(pipeline: Pipeline, commandLine: Option[String] = None)(implicit ex: ExecutionContext) : Unit = {
try {
val config = new Configuration { }

Expand All @@ -149,7 +155,13 @@ class DagrCoreArgs(
this.reportPath.foreach(p => Io.assertCanWriteFile(p, parentMustExist=false))

val resources = SystemResources(cores = cores.map(Cores(_)), totalMemory = memory.map(Memory(_)))
this.taskManager = Some(new TaskManager(taskManagerResources=resources, scriptsDirectory = scriptsDirectory, logDirectory = logDirectory))
this.executor = Some(
Executor(
resources = resources,
scriptsDirectory = scriptsDirectory,
logDirectory = logDirectory
)
)

// Print all the arguments if desired.
commandLine.foreach { line =>
Expand All @@ -168,52 +180,67 @@ class DagrCoreArgs(
* Attempts to setup the various directories needed to executed the pipeline, execute it, and generate
* an execution report.
*/
protected[cmdline] def execute(pipeline : Pipeline): Int = {
val taskMan = this.taskManager.getOrElse(throw new IllegalStateException("execute() called before configure()"))
val report = this.reportPath.getOrElse(throw new IllegalStateException("execute() called before configure()"))
protected[cmdline] def execute(pipeline : Pipeline)(implicit ex: ExecutionContext): Int = {
val report = this.reportPath.getOrElse(throw new IllegalStateException("execute() called before configure()"))

val interactiveReporter: Option[TopLikeStatusReporter] = interactive match {
case true if !Terminal.supportsAnsi =>
// Get the executor
val executor = this.executor.getOrElse(throw new IllegalStateException("Executor was not configured, did you all configure()"))

// Set up an interactive logger if desired and supported
if (this.interactive) {
if (Terminal.supportsAnsi) {
executor.withLogger(TopLikeStatusReporter(executor))
}
else {
logger.warning("ANSI codes are not supported in your terminal. Interactive mode will not be used.")
interactive = false
None
case true =>
val loggerOutputStream = new ByteArrayOutputStream()
val loggerPrintStream = new PrintStream(loggerOutputStream)
Logger.out = loggerPrintStream
Some(new TopLikeStatusReporter(taskMan, Some(loggerOutputStream), print = (s: String) => System.out.print(s)))
case false => None
}
}
interactiveReporter.foreach(_.start())

taskMan.addTask(pipeline)
taskMan.runToCompletion(this.failFast)
// Set up the execution logger whose output can be used later for replay
{
val logName = "replay_log.csv"
val log = if (Seq(Io.StdOut, PathUtil.pathTo("/dev/stderr"), Io.DevNull).contains(report)) {
executor.logDir.resolve(logName)
}
else {
report.getParent.resolve(logName)
}
val executionLogger = new ExecutionLogger(log)
executor.withLogger(executionLogger)
executor.withTaskRegister(executionLogger)
}

// Set up the task cache (in case of replay)
this.replayLog.foreach { log =>
executor.withTaskCache(TaskCache(log))
}

// execute
val exitCode = executor.execute(pipeline)

// Write out the execution report
if (!interactive || Io.StdOut != report) {
val pw = new PrintWriter(Io.toWriter(report))
taskMan.logReport({ str: String => pw.write(str + "\n") })
executor.logReport({ str: String => pw.write(str + "\n") })
pw.close()
}

interactiveReporter.foreach(_.shutdown())

// return an exit code based on the number of non-completed tasks
taskMan.taskToInfoBiMapFor.count { case (_, info) =>
TaskStatus.isTaskNotDone(info.status, failedIsDone=false)
}
exitCode
}

}

class DagrCoreMain extends LazyLogging {
protected def name: String = "dagr"

/** A main method that invokes System.exit with the exit code. */
def makeItSoAndExit(args: Array[String]): Unit = System.exit(makeItSo(args))
def makeItSoAndExit(args: Array[String]): Unit = {
import scala.concurrent.ExecutionContext.Implicits.global
System.exit(makeItSo(args))
}

/** A main method that returns an exit code instead of exiting. */
def makeItSo(args: Array[String], packageList: List[String] = DagrCoreMain.getPackageList, includeHidden: Boolean = false): Int = {
def makeItSo(args: Array[String], packageList: List[String] = DagrCoreMain.getPackageList, includeHidden: Boolean = false)
(implicit ex: ExecutionContext): Int = {
// Initialize color options
TermCode.printColor = DagrCoreMain.optionallyConfigure[Boolean](Configuration.Keys.ColorStatus).getOrElse(true)

Expand All @@ -226,7 +253,7 @@ class DagrCoreMain extends LazyLogging {
case Sopt.Failure(usage) =>
System.err.print(usage())
1
case Sopt.CommandSuccess(cmd) =>
case Sopt.CommandSuccess(_) =>
unreachable("CommandSuccess should never be returned by parseCommandAndSubCommand.")
case Sopt.SubcommandSuccess(dagr, pipeline) =>
val name = pipeline.getClass.getSimpleName
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/dagr/core/config/Configuration.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ import java.io.File
import java.nio.file.{Files, Path}
import java.time.Duration

import com.typesafe.config.{ConfigParseOptions, ConfigFactory, Config}
import com.typesafe.config.ConfigException.Generic
import com.fulcrumgenomics.commons.io.PathUtil._
import com.fulcrumgenomics.commons.util.LazyLogging
import dagr.core.execsystem.{Cores, Memory}
import com.typesafe.config.ConfigException.Generic
import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
import dagr.core.exec.{Cores, Memory}

import scala.collection.JavaConversions._
import scala.collection.SortedSet
import scala.reflect.runtime.universe.{TypeTag, typeOf}
import collection.JavaConversions._

/**
* Companion object to the Configuration trait that keeps track of all configuration keys
Expand Down
44 changes: 44 additions & 0 deletions core/src/main/scala/dagr/core/exec/ExecDef.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* The MIT License
*
* Copyright (c) 2017 Fulcrum GenomicsLLC
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*
*/

package dagr.core.exec

import scala.collection.mutable

object ExecDef {
/** Create a thread-safe mutable map. */
def concurrentMap[A,B](): mutable.Map[A,B] = {
import scala.collection.convert.decorateAsScala._
new java.util.concurrent.ConcurrentHashMap[A, B]().asScala
}

/** Create a thread-safe mutable set. */
def concurrentSet[A](): mutable.Set[A] = {
import scala.collection.convert.decorateAsScala._
val map: java.util.Map[A, java.lang.Boolean] = new java.util.concurrent.ConcurrentHashMap[A, java.lang.Boolean]()
val set: java.util.Set[A] = java.util.Collections.newSetFromMap[A](map)
set.asScala
}
}
105 changes: 105 additions & 0 deletions core/src/main/scala/dagr/core/exec/Executor.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* The MIT License
*
* Copyright (c) 2017 Fulcrum Genomics LLC
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*
*/

package dagr.core.exec

import com.fulcrumgenomics.commons.CommonsDef.{DirPath, yieldAndThen}
import dagr.core.execsystem.{SystemResources, TaskManager}
import dagr.core.reporting.ReportingDef.{TaskLogger, TaskRegister}
import dagr.core.reporting.{FinalStatusReporter, TaskStatusLogger}
import dagr.core.tasksystem.Task
import dagr.core.tasksystem.Task.{TaskInfo, TaskStatus}

import scala.collection.mutable.ListBuffer
import scala.concurrent.ExecutionContext

object Executor {
/** Create a new executor. */
def apply(resources: SystemResources,
scriptsDirectory: Option[DirPath],
logDirectory: Option[DirPath]
)(implicit ex: ExecutionContext): Executor = {
new TaskManager(taskManagerResources=resources, scriptsDirectory=scriptsDirectory, logDirectory=logDirectory)
}
}

/** All executors of tasks should extend this trait. */
trait Executor extends FinalStatusReporter {

/** A list of [[TaskCache]] to use to determine if a task should be manually succeeded. */
protected val taskCaches: ListBuffer[TaskCache] = ListBuffer[TaskCache]()

/** The loggers to be notified when a task's status is updated. */
private val _loggers: ListBuffer[TaskLogger] = ListBuffer[TaskLogger](new TaskStatusLogger)

/** The loggers to be notified when a task's status is updated. */
private val _registers: ListBuffer[TaskRegister] = ListBuffer[TaskRegister]()

/** Record that the task information has changed for a task. */
final def record(info: TaskInfo): Unit = this.synchronized {
this._loggers.foreach(_.record(info=info))
}

/** The method that will be called on the result of `Task.getTasks`. */
final def register(parent: Task, child: Task*): Unit = this.synchronized {
this._registers.foreach(_.register(parent, child:_*))
}

/** Adds the [[dagr.core.reporting.ReportingDef.TaskLogger]] to the list of loggers to be notified when a task's status is updated. */
final def withLogger(logger: TaskLogger): Unit = this.synchronized {
if (!this._loggers.contains(logger)) {
this._loggers.append(logger)
}
}

/** Adds the [[TaskRegister]] to the list of registers to be notified when a list of tasks is returned by [[Task.getTasks]] */
final def withTaskRegister(register: TaskRegister): this.type = this.synchronized {
yieldAndThen[this.type](this)(this._registers.append(register))
}

/** Adds the [[TaskCache]] to the list of caches to use to determine if a task should be manually succeeded. */
final def withTaskCache(taskCache: TaskCache): this.type = yieldAndThen[this.type](this) {
this.withTaskRegister(taskCache)
this.taskCaches.append(taskCache)
}

/** Start the execution of this task and all tasks that depend on it. Returns the number of tasks that were not
* executed. A given task should only be attempted once. */
final def execute(task: Task): Int = {
task._executor = Some(this)
this.register(task, task)
this._execute(task=task)
}

/** Returns the task status by ordinal */
def from(ordinal: Int): TaskStatus

/** Returns the log directory. */
def logDir: DirPath

/** Start the execution of this task and all tasks that depend on it. Returns the number of tasks that were not
* executed. A given task should only be attempted once. All executors should implement this method. */
protected def _execute(task: Task): Int
}
Loading

0 comments on commit 78cb922

Please sign in to comment.