Skip to content
This repository has been archived by the owner on Jun 20, 2024. It is now read-only.

WIP: web service API #304

Open
wants to merge 11 commits into
base: nh_exec2_replay
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 27 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ lazy val commonSettings = Seq(
testOptions in Test += Tests.Argument(TestFrameworks.ScalaTest, "-h", Option(System.getenv("TEST_HTML_REPORTS")).getOrElse(htmlReportsDirectory)),
testOptions in Test += Tests.Argument("-l", "LongRunningTest"), // ignores long running tests
// uncomment for full stack traces
//testOptions in Test += Tests.Argument("-oD"),
testOptions in Test += Tests.Argument("-oDF"),
fork in Test := true,
resolvers += Resolver.jcenterRepo,
resolvers += Resolver.sonatypeRepo("public"),
Expand Down Expand Up @@ -123,6 +123,7 @@ lazy val core = Project(id="dagr-core", base=file("core"))
"com.fulcrumgenomics" %% "commons" % "0.2.0",
"com.fulcrumgenomics" %% "sopt" % "0.2.0",
"com.github.dblock" % "oshi-core" % "3.3",
"com.beachape" %% "enumeratum" % "1.5.12",
"org.scala-lang" % "scala-reflect" % scalaVersion.value,
"org.scala-lang" % "scala-compiler" % scalaVersion.value,
"org.reflections" % "reflections" % "0.9.10",
Expand Down Expand Up @@ -163,6 +164,29 @@ lazy val pipelines = Project(id="dagr-pipelines", base=file("pipelines"))
.disablePlugins(sbtassembly.AssemblyPlugin)
.dependsOn(tasks, core)

////////////////////////////////////////////////////////////////////////////////////////////////
// webservice project
////////////////////////////////////////////////////////////////////////////////////////////////
val akkaV = "2.3.9"
val sprayV = "1.3.3"
lazy val webservice = Project(id="dagr-webservice", base=file("webservice"))
.settings(commonSettings: _*)
.settings(unidocSettings: _*)
.settings(assemblySettings: _*)
.settings(description := "A tool to execute tasks in directed acyclic graphs.")
.settings(
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % akkaV,
"io.spray" %% "spray-can" % sprayV,
"io.spray" %% "spray-routing" % sprayV,
"io.spray" %% "spray-client" % sprayV,
"io.spray" %% "spray-http" % sprayV,
"io.spray" %% "spray-json" % sprayV
)
)
.aggregate(core, tasks, pipelines)
.dependsOn(core, tasks, pipelines)

////////////////////////////////////////////////////////////////////////////////////////////////
// root (dagr) project
////////////////////////////////////////////////////////////////////////////////////////////////
Expand All @@ -175,8 +199,8 @@ lazy val root = Project(id="dagr", base=file("."))
.settings(unidocSettings: _*)
.settings(assemblySettings: _*)
.settings(description := "A tool to execute tasks in directed acyclic graphs.")
.aggregate(core, tasks, pipelines)
.dependsOn(core, tasks, pipelines)
.aggregate(core, tasks, pipelines, webservice) // FIXME: should not depend on webservice
.dependsOn(core, tasks, pipelines, webservice)

////////////////////////////////////////////////////////////////////////////////////////////////
// Merge strategy for assembly
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/scala/dagr/core/DagrDef.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,10 @@ package dagr.core
object DagrDef {
/** The type of identifier used to uniquely identify tasks tracked by the execution system. */
type TaskId = BigInt

/** Companion methods for TaskId */
object TaskId {
/** The apply method for TaskId */
def apply(value: Int): TaskId = BigInt(value)
}
}
137 changes: 89 additions & 48 deletions core/src/main/scala/dagr/core/cmdline/DagrCoreMain.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,29 @@
*/
package dagr.core.cmdline

import java.io.{ByteArrayOutputStream, PrintStream, PrintWriter}
import java.io.PrintWriter
import java.net.InetAddress
import java.nio.file.{Files, Path}
import java.text.DecimalFormat

import com.fulcrumgenomics.commons.CommonsDef.unreachable
import com.fulcrumgenomics.commons.CommonsDef.{FilePath, unreachable}
import com.fulcrumgenomics.commons.io.{Io, PathUtil}
import com.fulcrumgenomics.commons.util.{LazyLogging, LogLevel, Logger}
import com.fulcrumgenomics.sopt.Sopt.CommandSuccess
import com.fulcrumgenomics.sopt.cmdline.{CommandLineParser, CommandLineProgramParserStrings, ValidationException}
import com.fulcrumgenomics.sopt.parsing.{ArgOptionAndValues, ArgTokenCollator, ArgTokenizer, OptionParser}
import com.fulcrumgenomics.sopt.cmdline.{CommandLineProgramParserStrings, ValidationException}
import com.fulcrumgenomics.sopt.parsing.{ArgOptionAndValues, ArgTokenCollator, ArgTokenizer}
import com.fulcrumgenomics.sopt.util.TermCode
import com.fulcrumgenomics.sopt.{OptionName, Sopt, arg}
import com.fulcrumgenomics.sopt.{Sopt, arg}
import dagr.core.config.Configuration
import dagr.core.execsystem._
import dagr.core.exec._
import dagr.core.reporting.{ExecutionLogger, Terminal, TopLikeStatusReporter}
import dagr.core.tasksystem.Pipeline

import scala.collection.mutable.ListBuffer
import scala.util.{Failure, Success}
import scala.concurrent.ExecutionContext
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag
import scala.util.Success


object DagrCoreMain extends Configuration {
/** The packages we wish to include in our command line **/
Expand All @@ -51,8 +55,9 @@ object DagrCoreMain extends Configuration {
}

/** The main method */
/** The main method */
def main(args: Array[String]): Unit = new DagrCoreMain().makeItSoAndExit(args)
def main(args: Array[String]): Unit = {
new DagrCoreMain[DagrCoreArgs]().makeItSoAndExit(args)
}

/** Provide a command line validation error message */
private[cmdline] def buildErrorMessage(msgOption: Option[String] = None, exceptionOption: Option[Exception] = None): String = {
Expand Down Expand Up @@ -103,11 +108,15 @@ class DagrCoreArgs(
@arg(doc = "Write an execution report to this file, otherwise write to the stdout")
val report: Option[Path] = None,
@arg(doc = "Provide an top-like interface for tasks with the give delay in seconds. This suppress info logging.")
var interactive: Boolean = false
var interactive: Boolean = false,
@arg(doc = "Use the experimental execution system.")
val experimentalExecution: Boolean = false,
@arg(doc = "Attempt to replay using the provided replay log")
val replayLog: Option[FilePath] = None
) extends LazyLogging {

// These are not optional, but are only populated during configure()
private var taskManager : Option[TaskManager] = None
private var executor : Option[Executor] = None
private var reportPath : Option[Path] = None

// Initialize the configuration as early as possible
Expand All @@ -121,11 +130,11 @@ class DagrCoreArgs(
/** Try to create a given directory, and if there is an exception, write the path since some exceptions can be obtuse */
private def mkdir(dir: Path, use: String, errors: ListBuffer[String]): Unit = {
try { Files.createDirectories(dir) }
catch { case e: Exception => errors += DagrCoreMain.buildErrorMessage(Some(s"Could not create the $use directory: $dir")) }
catch { case _: Exception => errors += DagrCoreMain.buildErrorMessage(Some(s"Could not create the $use directory: $dir")) }
}

// Invoked by DagrCommandLineParser after the pipeline has also been instantiated
private[cmdline] def configure(pipeline: Pipeline, commandLine: Option[String] = None) : Unit = {
protected[dagr] def configure(pipeline: Pipeline, commandLine: Option[String] = None)(implicit ex: ExecutionContext) : Unit = {
try {
val config = new Configuration { }

Expand All @@ -149,7 +158,14 @@ class DagrCoreArgs(
this.reportPath.foreach(p => Io.assertCanWriteFile(p, parentMustExist=false))

val resources = SystemResources(cores = cores.map(Cores(_)), totalMemory = memory.map(Memory(_)))
this.taskManager = Some(new TaskManager(taskManagerResources=resources, scriptsDirectory = scriptsDirectory, logDirectory = logDirectory))
this.executor = Some(
Executor(
experimentalExecution = experimentalExecution,
resources = resources,
scriptsDirectory = scriptsDirectory,
logDirectory = logDirectory
)
)

// Print all the arguments if desired.
commandLine.foreach { line =>
Expand All @@ -164,56 +180,81 @@ class DagrCoreArgs(
}
}

/**
* Attempts to setup the various directories needed to executed the pipeline, execute it, and generate
* an execution report.
*/
protected[cmdline] def execute(pipeline : Pipeline): Int = {
val taskMan = this.taskManager.getOrElse(throw new IllegalStateException("execute() called before configure()"))
val report = this.reportPath.getOrElse(throw new IllegalStateException("execute() called before configure()"))

val interactiveReporter: Option[TopLikeStatusReporter] = interactive match {
case true if !Terminal.supportsAnsi =>
protected def executeSetup(executor: Executor, report: FilePath): Unit = {
// Set up an interactive logger if desired and supported
if (this.interactive) {
if (Terminal.supportsAnsi) {
executor.withReporter(TopLikeStatusReporter(executor))
}
else {
logger.warning("ANSI codes are not supported in your terminal. Interactive mode will not be used.")
interactive = false
None
case true =>
val loggerOutputStream = new ByteArrayOutputStream()
val loggerPrintStream = new PrintStream(loggerOutputStream)
Logger.out = loggerPrintStream
Some(new TopLikeStatusReporter(taskMan, Some(loggerOutputStream), print = (s: String) => System.out.print(s)))
case false => None
}
}

// Set up the execution logger whose output can be used later for replay
{
val logName = "replay_log.csv"
val log = if (Seq(Io.StdOut, PathUtil.pathTo("/dev/stderr"), Io.DevNull).contains(report)) {
executor.logDir.resolve(logName)
}
else {
report.getParent.resolve(logName)
}
val executionLogger = new ExecutionLogger(log)
executor.withReporter(executionLogger)
}
interactiveReporter.foreach(_.start())

taskMan.addTask(pipeline)
taskMan.runToCompletion(this.failFast)
// Set up the task cache (in case of replay)
this.replayLog.foreach { log =>
executor.withReporter(TaskCache(log))
}
}

protected def executeFinish(executor: Executor, report: FilePath): Unit = {
// Write out the execution report
if (!interactive || Io.StdOut != report) {
val pw = new PrintWriter(Io.toWriter(report))
taskMan.logReport({ str: String => pw.write(str + "\n") })
executor.logReport({ str: String => pw.write(str + "\n") })
pw.close()
}

interactiveReporter.foreach(_.shutdown())

// return an exit code based on the number of non-completed tasks
taskMan.taskToInfoBiMapFor.count { case (_, info) =>
TaskStatus.isTaskNotDone(info.status, failedIsDone=false)
}
}

/**
* Attempts to setup the various directories needed to executed the pipeline, execute it, and generate
* an execution report.
*/
protected[cmdline] def execute(pipeline : Pipeline)(implicit ex: ExecutionContext): Int = {
val report = this.reportPath.getOrElse(throw new IllegalStateException("execute() called before configure()"))

// Get the executor
val executor = this.executor.getOrElse(throw new IllegalStateException("Executor was not configured, did you all configure()"))

// Set up any task prior to execution
executeSetup(executor, report)

// execute
val exitCode = executor.execute(pipeline)

// complete any shutdown tasks after execution
executeFinish(executor, report)

exitCode
}
}

class DagrCoreMain extends LazyLogging {
class DagrCoreMain[Args<:DagrCoreArgs:TypeTag:ClassTag] extends LazyLogging {
protected def name: String = "dagr"

/** A main method that invokes System.exit with the exit code. */
def makeItSoAndExit(args: Array[String]): Unit = System.exit(makeItSo(args))
def makeItSoAndExit(args: Array[String]): Unit = {
import scala.concurrent.ExecutionContext.Implicits.global
System.exit(makeItSo(args))
}

/** A main method that returns an exit code instead of exiting. */
def makeItSo(args: Array[String], packageList: List[String] = DagrCoreMain.getPackageList, includeHidden: Boolean = false): Int = {
def makeItSo(args: Array[String], packageList: List[String] = DagrCoreMain.getPackageList, includeHidden: Boolean = false)
(implicit ex: ExecutionContext): Int = {
// Initialize color options
TermCode.printColor = DagrCoreMain.optionallyConfigure[Boolean](Configuration.Keys.ColorStatus).getOrElse(true)

Expand All @@ -222,11 +263,11 @@ class DagrCoreMain extends LazyLogging {

val startTime = System.currentTimeMillis()
val packages = Sopt.find[Pipeline](packageList, includeHidden=includeHidden)
val exit = Sopt.parseCommandAndSubCommand[DagrCoreArgs,Pipeline](name, args, packages) match {
val exit = Sopt.parseCommandAndSubCommand[Args,Pipeline](name, args, packages) match {
case Sopt.Failure(usage) =>
System.err.print(usage())
1
case Sopt.CommandSuccess(cmd) =>
case Sopt.CommandSuccess(_) =>
unreachable("CommandSuccess should never be returned by parseCommandAndSubCommand.")
case Sopt.SubcommandSuccess(dagr, pipeline) =>
val name = pipeline.getClass.getSimpleName
Expand Down
13 changes: 4 additions & 9 deletions core/src/main/scala/dagr/core/config/Configuration.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ import java.io.File
import java.nio.file.{Files, Path}
import java.time.Duration

import com.typesafe.config.{ConfigParseOptions, ConfigFactory, Config}
import com.typesafe.config.ConfigException.Generic
import com.fulcrumgenomics.commons.io.PathUtil._
import com.fulcrumgenomics.commons.util.LazyLogging
import dagr.core.execsystem.{Cores, Memory}
import com.typesafe.config.ConfigException.Generic
import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
import dagr.core.exec.{Cores, Memory}

import scala.collection.JavaConversions._
import scala.collection.SortedSet
import scala.reflect.runtime.universe.{TypeTag, typeOf}
import collection.JavaConversions._

/**
* Companion object to the Configuration trait that keeps track of all configuration keys
Expand Down Expand Up @@ -228,11 +228,6 @@ private[config] trait ConfigurationLike extends LazyLogging {
*/
protected def systemPath : Seq[Path] = config.getString(Configuration.Keys.SystemPath).split(File.pathSeparatorChar).view.map(pathTo(_))

/** Removes various characters from the simple class name, for scala class names. */
private def sanitizeSimpleClassName(className: String): String = {
className.replaceFirst("[$].*$", "")
}

/** Searches the system path for the executable and return the full path. */
private def findInPath(executable: String) : Option[Path] = {
systemPath.map(p => p.resolve(executable)).find(ex => Files.exists(ex))
Expand Down
44 changes: 44 additions & 0 deletions core/src/main/scala/dagr/core/exec/ExecDef.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* The MIT License
*
* Copyright (c) 2017 Fulcrum GenomicsLLC
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*
*/

package dagr.core.exec

import scala.collection.mutable

object ExecDef {
/** Create a thread-safe mutable map. */
def concurrentMap[A,B](): mutable.Map[A,B] = {
import scala.collection.convert.decorateAsScala._
new java.util.concurrent.ConcurrentHashMap[A, B]().asScala
}

/** Create a thread-safe mutable set. */
def concurrentSet[A](): mutable.Set[A] = {
import scala.collection.convert.decorateAsScala._
val map: java.util.Map[A, java.lang.Boolean] = new java.util.concurrent.ConcurrentHashMap[A, java.lang.Boolean]()
val set: java.util.Set[A] = java.util.Collections.newSetFromMap[A](map)
set.asScala
}
}
Loading