Skip to content
This repository has been archived by the owner on Jun 20, 2024. It is now read-only.

Commit

Permalink
Adding a new execution system.
Browse files Browse the repository at this point in the history
  • Loading branch information
nh13 committed May 19, 2017
1 parent 81021e8 commit 63b65b5
Show file tree
Hide file tree
Showing 25 changed files with 4,187 additions and 21 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ lazy val commonSettings = Seq(
testOptions in Test += Tests.Argument(TestFrameworks.ScalaTest, "-h", Option(System.getenv("TEST_HTML_REPORTS")).getOrElse(htmlReportsDirectory)),
testOptions in Test += Tests.Argument("-l", "LongRunningTest"), // ignores long running tests
// uncomment for full stack traces
//testOptions in Test += Tests.Argument("-oDF"),
testOptions in Test += Tests.Argument("-oDF"),
fork in Test := true,
resolvers += Resolver.jcenterRepo,
resolvers += Resolver.sonatypeRepo("public"),
Expand Down
61 changes: 44 additions & 17 deletions core/src/main/scala/dagr/core/cmdline/DagrCoreMain.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,14 @@ import com.fulcrumgenomics.sopt.{Sopt, arg}
import dagr.core.config.Configuration
import dagr.core.exec.{Cores, Memory}
import dagr.core.execsystem._
import dagr.core.execsystem2.GraphExecutor
import dagr.core.execsystem2.{TopLikeStatusReporter => TopLikeStatusReporter2}
import dagr.core.execsystem2.local.LocalTaskExecutor
import dagr.core.reporting.{FinalStatusReporter, PeriodicRefreshingReporter, Terminal}
import dagr.core.tasksystem.Pipeline

import scala.collection.mutable.ListBuffer
import scala.concurrent.ExecutionContext
import scala.util.Success

object DagrCoreMain extends Configuration {
Expand All @@ -53,7 +57,9 @@ object DagrCoreMain extends Configuration {

/** The main method */
/** The main method */
def main(args: Array[String]): Unit = new DagrCoreMain().makeItSoAndExit(args)
def main(args: Array[String]): Unit = {
new DagrCoreMain().makeItSoAndExit(args)
}

/** Provide a command line validation error message */
private[cmdline] def buildErrorMessage(msgOption: Option[String] = None, exceptionOption: Option[Exception] = None): String = {
Expand Down Expand Up @@ -104,11 +110,14 @@ class DagrCoreArgs(
@arg(doc = "Write an execution report to this file, otherwise write to the stdout")
val report: Option[Path] = None,
@arg(doc = "Provide an top-like interface for tasks with the give delay in seconds. This suppress info logging.")
var interactive: Boolean = false
var interactive: Boolean = false,
@arg(doc = "Use the experimental execution system.")
val experimentalExecution: Boolean = false
) extends LazyLogging {

// These are not optional, but are only populated during configure()
private var taskManager : Option[TaskManager] = None
private var taskExecutor: Option[LocalTaskExecutor] = None
private var reportPath : Option[Path] = None

// Initialize the configuration as early as possible
Expand All @@ -122,11 +131,11 @@ class DagrCoreArgs(
/** Try to create a given directory, and if there is an exception, write the path since some exceptions can be obtuse */
private def mkdir(dir: Path, use: String, errors: ListBuffer[String]): Unit = {
try { Files.createDirectories(dir) }
catch { case e: Exception => errors += DagrCoreMain.buildErrorMessage(Some(s"Could not create the $use directory: $dir")) }
catch { case _: Exception => errors += DagrCoreMain.buildErrorMessage(Some(s"Could not create the $use directory: $dir")) }
}

// Invoked by DagrCommandLineParser after the pipeline has also been instantiated
private[cmdline] def configure(pipeline: Pipeline, commandLine: Option[String] = None) : Unit = {
private[cmdline] def configure(pipeline: Pipeline, commandLine: Option[String] = None)(implicit ex: ExecutionContext) : Unit = {
try {
val config = new Configuration { }

Expand All @@ -150,7 +159,12 @@ class DagrCoreArgs(
this.reportPath.foreach(p => Io.assertCanWriteFile(p, parentMustExist=false))

val resources = SystemResources(cores = cores.map(Cores(_)), totalMemory = memory.map(Memory(_)))
this.taskManager = Some(new TaskManager(taskManagerResources=resources, scriptsDirectory = scriptsDirectory, logDirectory = logDirectory))
if (experimentalExecution) {
this.taskExecutor = Some(new LocalTaskExecutor(systemResources = resources, scriptsDirectory = scriptsDirectory, logDirectory = logDirectory))
}
else {
this.taskManager = Some(new TaskManager(taskManagerResources = resources, scriptsDirectory = scriptsDirectory, logDirectory = logDirectory))
}

// Print all the arguments if desired.
commandLine.foreach { line =>
Expand All @@ -169,13 +183,13 @@ class DagrCoreArgs(
* Attempts to setup the various directories needed to executed the pipeline, execute it, and generate
* an execution report.
*/
protected[cmdline] def execute(pipeline : Pipeline): Int = {
val taskMan = this.taskManager.getOrElse(throw new IllegalStateException("execute() called before configure()"))
protected[cmdline] def execute(pipeline : Pipeline)(implicit ex: ExecutionContext): Int = {
val report = this.reportPath.getOrElse(throw new IllegalStateException("execute() called before configure()"))

if (interactive && !Terminal.supportsAnsi) {
logger.warning("ANSI codes are not supported in your terminal. Interactive mode will not be used.")
interactive = false
None
}

def toLoggerOutputStream(): ByteArrayOutputStream = {
Expand All @@ -185,7 +199,21 @@ class DagrCoreArgs(
loggerOutputStream
}

val (finalStatusReporter: FinalStatusReporter, exitCode: Int) = {
val (finalStatusReporter: FinalStatusReporter, exitCode: Int) = if (experimentalExecution) {
val taskExecutor = this.taskExecutor.getOrElse(throw new IllegalStateException("execute() called before configure()"))
val graphExecutor = GraphExecutor(taskExecutor)
if (interactive) {
val reporter = new TopLikeStatusReporter2(
graphExecutor = graphExecutor,
systemResources = taskExecutor.resources,
loggerOut = Some(toLoggerOutputStream()),
print = s => System.out.print(s)
)
graphExecutor.withLogger(reporter)
}
(graphExecutor, graphExecutor execute pipeline)
}
else {
val taskMan = this.taskManager.getOrElse(throw new IllegalStateException("execute() called before configure()"))

val interactiveReporter: Option[PeriodicRefreshingReporter] = if (!interactive) { None } else {
Expand Down Expand Up @@ -213,23 +241,22 @@ class DagrCoreArgs(
pw.close()
}


// return an exit code based on the number of non-completed tasks
taskMan.taskToInfoBiMapFor.count { case (_, info) =>
TaskStatus.notDone(info.status, failedIsDone=false)
}
exitCode
}

}

class DagrCoreMain extends LazyLogging {
protected def name: String = "dagr"

/** A main method that invokes System.exit with the exit code. */
def makeItSoAndExit(args: Array[String]): Unit = System.exit(makeItSo(args))
def makeItSoAndExit(args: Array[String]): Unit = {
import scala.concurrent.ExecutionContext.Implicits.global
System.exit(makeItSo(args))
}

/** A main method that returns an exit code instead of exiting. */
def makeItSo(args: Array[String], packageList: List[String] = DagrCoreMain.getPackageList, includeHidden: Boolean = false): Int = {
def makeItSo(args: Array[String], packageList: List[String] = DagrCoreMain.getPackageList, includeHidden: Boolean = false)
(implicit ex: ExecutionContext): Int = {
// Initialize color options
TermCode.printColor = DagrCoreMain.optionallyConfigure[Boolean](Configuration.Keys.ColorStatus).getOrElse(true)

Expand All @@ -242,7 +269,7 @@ class DagrCoreMain extends LazyLogging {
case Sopt.Failure(usage) =>
System.err.print(usage())
1
case Sopt.CommandSuccess(cmd) =>
case Sopt.CommandSuccess(_) =>
unreachable("CommandSuccess should never be returned by parseCommandAndSubCommand.")
case Sopt.SubcommandSuccess(dagr, pipeline) =>
val name = pipeline.getClass.getSimpleName
Expand Down
127 changes: 127 additions & 0 deletions core/src/main/scala/dagr/core/execsystem2/DependencyGraph.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* The MIT License
*
* Copyright (c) 2017 Fulcrum Genomics LLC
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*
*/

package dagr.core.execsystem2

import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.AtomicInteger

import com.fulcrumgenomics.commons.util.LazyLogging
import dagr.core.tasksystem.Task

trait DependencyGraph {

/** Add a task to the dependency graph and returns true if the task has no dependencies, false otherwise. */
def add(task: Task): Boolean

/** None if the task was already added, true if the task was added and has no dependencies, false otherwise. */
def maybeAdd(task: Task): Option[Boolean]

/** Removes this task as a dependency for all other tasks in this dependency graph. The task should not depend on
* any tasks, and all tasks that depend on it will have their dependency on this task removed.
*/
def remove(task: Task): Seq[Task]

/** Returns None if the task is not in the graph, true if it has dependencies, false otherwise.
*/
def hasDependencies(task: Task): Option[Boolean]

/** Returns true if the task is in the graph, false otherwise. */
def contains(task: Task): Boolean

def size: Int

def exceptIfCyclicalDependency(task: Task): Unit
}

object DependencyGraph {
def apply(): DependencyGraph = new SimpleDependencyGraph
}

/**
* Simple dependency graph that uses a [[CountDownLatch]] on the number of dependencies for a [[Task]] to block until a
* task has no dependencies.
*/
private class SimpleDependencyGraph extends DependencyGraph with LazyLogging {
import scala.collection.mutable

// NB: I think that the dependents in Task could be updated while were are doing this! How do we synchronize? Do we
// have a global lock in the Task object?
def exceptIfCyclicalDependency(task: Task): Unit = this.synchronized {
// check for cycles
if (Task.hasCycle(task)) {
logger.error("Task was part of a graph that had a cycle")
for (component <- Task.findStronglyConnectedComponents(task = task)) {
if (Task.isComponentACycle(component = component)) {
logger.error("Tasks were part of a strongly connected component with a cycle: "
+ component.map(t => s"'${t.name}'").mkString(", "))
}
}
throw new IllegalArgumentException(s"Task was part of a graph that had a cycle '${task.name}'")
}
}

// FIXME: no need to be package private
private[execsystem2] val graph: mutable.Map[Task, AtomicInteger] = ExecDef.concurrentMap()

def maybeAdd(task: Task): Option[Boolean] = this.synchronized { if (contains(task)) None else Some(add(task)) }

def add(task: Task): Boolean = this.synchronized {
require(!this.graph.contains(task), s"Task '${task.name}' is already part of the dependency graph")
this.graph.put(task, new AtomicInteger(task.tasksDependedOn.size))
!this.hasDependencies(task).get
}

/** Removes this task from the dependency graph. It should not depend on any tasks itself, and all tasks that depend
* on it will have their dependency on this task removed. Returns any dependent task that now has no more
* dependencies.
*/
def remove(task: Task): Seq[Task] = {
// NB: we decrement the countdown latch for each task that depended on this task. If this decrement causes any of
// the dependent tasks to have their latch reach zero, then they will return from waiting on their latch and exit
// the [[add]] method.
require(task.tasksDependedOn.isEmpty,
s"Removing a task '${task.name}' from the dependency graph that has dependencies: "
+ task.tasksDependedOn.map(_.name).mkString(", "))
// remove this as a dependency for all other tasks that depend on this task
task.tasksDependingOnThisTask.flatMap { dependent =>
dependent.synchronized {
require(this.graph.contains(dependent), s"Dependent '${dependent.name}' not in the dependency graph")
task !=> dependent
val latch = this.graph(dependent)
if (latch.decrementAndGet() == 0) Some(dependent) else None
}
}.toSeq
}

def hasDependencies(task: Task): Option[Boolean] = {
this.graph.get(task).map { e => e.get() > 0 }
}

def contains(task: Task): Boolean = this.graph.contains(task)

def size: Int = this.graph.size
}

43 changes: 43 additions & 0 deletions core/src/main/scala/dagr/core/execsystem2/ExecDef.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* The MIT License
*
* Copyright (c) 2017 Fulcrum Genomics LLC
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*
*/

package dagr.core.execsystem2

import scala.collection.mutable

private[execsystem2] object ExecDef {
/** Create a thread-safe mutable map. */
def concurrentMap[A,B](): mutable.Map[A,B] = {
import scala.collection.convert.decorateAsScala._
new java.util.concurrent.ConcurrentHashMap[A, B]().asScala
}

def concurrentSet[A](): mutable.Set[A] = {
import scala.collection.convert.decorateAsScala._
val map: java.util.Map[A, java.lang.Boolean] = new java.util.concurrent.ConcurrentHashMap[A, java.lang.Boolean]()
val set: java.util.Set[A] = java.util.Collections.newSetFromMap[A](map)
set.asScala
}
}
Loading

0 comments on commit 63b65b5

Please sign in to comment.