From 57ac01f144fda3cdf2b69f5211e47c54c499d5e9 Mon Sep 17 00:00:00 2001 From: Nils Homer Date: Sun, 25 Jun 2017 20:25:00 -0700 Subject: [PATCH] web 2.0 --- build.sbt | 29 +++- core/src/main/scala/dagr/core/DagrDef.scala | 6 + .../dagr/core/cmdline/DagrCoreMain.scala | 49 ++++-- .../main/scala/dagr/core/exec/Executor.scala | 2 +- .../main/scala/dagr/core/exec/TaskCache.scala | 2 +- .../core/execsystem/TaskManagerLike.scala | 2 +- .../dagr/core/execsystem2/GraphExecutor.scala | 2 +- .../dagr/core/reporting/ExecutionLogger.scala | 6 +- .../dagr/core/reporting/ReportingDef.scala | 4 +- .../core/reporting/TaskStatusLogger.scala | 4 +- .../reporting/TopLikeStatusReporter.scala | 4 +- .../scala/dagr/core/tasksystem/Task.scala | 61 ++++++- .../dagr/core/exec/SimpleTaskCacheTest.scala | 6 +- .../core/execsystem2/GraphExecutorTest.scala | 2 +- src/main/scala/dagr/cmdline/DagrMain.scala | 5 +- webservice/README.md | 14 ++ webservice/build.sbt | 2 + .../main/resources/example.webservice.conf | 40 +++++ .../scala/dagr/webservice/ApiDataModels.scala | 43 +++++ .../scala/dagr/webservice/Configuration.scala | 35 ++++ .../dagr/webservice/DagrApiHandler.scala | 83 +++++++++ .../dagr/webservice/DagrApiJsonSupport.scala | 162 ++++++++++++++++++ .../dagr/webservice/DagrApiService.scala | 133 ++++++++++++++ .../scala/dagr/webservice/DagrServer.scala | 122 +++++++++++++ .../dagr/webservice/DagrServerMain.scala | 112 ++++++++++++ .../scala/dagr/webservice/PerRequest.scala | 149 ++++++++++++++++ 26 files changed, 1032 insertions(+), 47 deletions(-) create mode 100644 webservice/README.md create mode 100644 webservice/build.sbt create mode 100644 webservice/src/main/resources/example.webservice.conf create mode 100644 webservice/src/main/scala/dagr/webservice/ApiDataModels.scala create mode 100644 webservice/src/main/scala/dagr/webservice/Configuration.scala create mode 100644 webservice/src/main/scala/dagr/webservice/DagrApiHandler.scala create mode 100644 webservice/src/main/scala/dagr/webservice/DagrApiJsonSupport.scala create mode 100644 webservice/src/main/scala/dagr/webservice/DagrApiService.scala create mode 100644 webservice/src/main/scala/dagr/webservice/DagrServer.scala create mode 100644 webservice/src/main/scala/dagr/webservice/DagrServerMain.scala create mode 100644 webservice/src/main/scala/dagr/webservice/PerRequest.scala diff --git a/build.sbt b/build.sbt index e5ee5b6c..56fcbc22 100644 --- a/build.sbt +++ b/build.sbt @@ -123,7 +123,7 @@ lazy val core = Project(id="dagr-core", base=file("core")) "com.fulcrumgenomics" %% "commons" % "0.2.0-SNAPSHOT", "com.fulcrumgenomics" %% "sopt" % "0.2.0-SNAPSHOT", "com.github.dblock" % "oshi-core" % "3.3", - "com.beachape" %% "enumeratum" % "1.5.12", + "com.beachape" %% "enumeratum" % "1.5.12", "org.scala-lang" % "scala-reflect" % scalaVersion.value, "org.scala-lang" % "scala-compiler" % scalaVersion.value, "org.reflections" % "reflections" % "0.9.10", @@ -164,6 +164,29 @@ lazy val pipelines = Project(id="dagr-pipelines", base=file("pipelines")) .disablePlugins(sbtassembly.AssemblyPlugin) .dependsOn(tasks, core) +//////////////////////////////////////////////////////////////////////////////////////////////// +// webservice project +//////////////////////////////////////////////////////////////////////////////////////////////// +val akkaV = "2.3.9" +val sprayV = "1.3.3" +lazy val webservice = Project(id="dagr-webservice", base=file("webservice")) + .settings(commonSettings: _*) + .settings(unidocSettings: _*) + .settings(assemblySettings: _*) + .settings(description := "A tool to execute tasks in directed acyclic graphs.") + .settings( + libraryDependencies ++= Seq( + "com.typesafe.akka" %% "akka-actor" % akkaV, + "io.spray" %% "spray-can" % sprayV, + "io.spray" %% "spray-routing" % sprayV, + "io.spray" %% "spray-client" % sprayV, + "io.spray" %% "spray-http" % sprayV, + "io.spray" %% "spray-json" % sprayV + ) + ) + .aggregate(core, tasks, pipelines) + .dependsOn(core, tasks, pipelines) + //////////////////////////////////////////////////////////////////////////////////////////////// // root (dagr) project //////////////////////////////////////////////////////////////////////////////////////////////// @@ -176,8 +199,8 @@ lazy val root = Project(id="dagr", base=file(".")) .settings(unidocSettings: _*) .settings(assemblySettings: _*) .settings(description := "A tool to execute tasks in directed acyclic graphs.") - .aggregate(core, tasks, pipelines) - .dependsOn(core, tasks, pipelines) + .aggregate(core, tasks, pipelines, webservice) // FIXME: should not depend on webservice + .dependsOn(core, tasks, pipelines, webservice) //////////////////////////////////////////////////////////////////////////////////////////////// // Merge strategy for assembly diff --git a/core/src/main/scala/dagr/core/DagrDef.scala b/core/src/main/scala/dagr/core/DagrDef.scala index 8d352113..63624915 100644 --- a/core/src/main/scala/dagr/core/DagrDef.scala +++ b/core/src/main/scala/dagr/core/DagrDef.scala @@ -34,4 +34,10 @@ package dagr.core object DagrDef { /** The type of identifier used to uniquely identify tasks tracked by the execution system. */ type TaskId = BigInt + + /** Companion methods for TaskId */ + object TaskId { + /** The apply method for TaskId */ + def apply(value: Int): TaskId = BigInt(value) + } } diff --git a/core/src/main/scala/dagr/core/cmdline/DagrCoreMain.scala b/core/src/main/scala/dagr/core/cmdline/DagrCoreMain.scala index 5b1863de..813c8cfd 100644 --- a/core/src/main/scala/dagr/core/cmdline/DagrCoreMain.scala +++ b/core/src/main/scala/dagr/core/cmdline/DagrCoreMain.scala @@ -42,8 +42,11 @@ import dagr.core.tasksystem.Pipeline import scala.collection.mutable.ListBuffer import scala.concurrent.ExecutionContext +import scala.reflect.ClassTag +import scala.reflect.runtime.universe.TypeTag import scala.util.Success + object DagrCoreMain extends Configuration { /** The packages we wish to include in our command line **/ protected def getPackageList: List[String] = { @@ -51,10 +54,9 @@ object DagrCoreMain extends Configuration { config.optionallyConfigure[List[String]](Configuration.Keys.PackageList) getOrElse List[String]("dagr") } - /** The main method */ /** The main method */ def main(args: Array[String]): Unit = { - new DagrCoreMain().makeItSoAndExit(args) + new DagrCoreMain[DagrCoreArgs]().makeItSoAndExit(args) } /** Provide a command line validation error message */ @@ -132,7 +134,7 @@ class DagrCoreArgs( } // Invoked by DagrCommandLineParser after the pipeline has also been instantiated - private[cmdline] def configure(pipeline: Pipeline, commandLine: Option[String] = None)(implicit ex: ExecutionContext) : Unit = { + protected[dagr] def configure(pipeline: Pipeline, commandLine: Option[String] = None)(implicit ex: ExecutionContext) : Unit = { try { val config = new Configuration { } @@ -178,16 +180,7 @@ class DagrCoreArgs( } } - /** - * Attempts to setup the various directories needed to executed the pipeline, execute it, and generate - * an execution report. - */ - protected[cmdline] def execute(pipeline : Pipeline)(implicit ex: ExecutionContext): Int = { - val report = this.reportPath.getOrElse(throw new IllegalStateException("execute() called before configure()")) - - // Get the executor - val executor = this.executor.getOrElse(throw new IllegalStateException("Executor was not configured, did you all configure()")) - + protected def executeSetup(executor: Executor, report: FilePath): Unit = { // Set up an interactive logger if desired and supported if (this.interactive) { if (Terminal.supportsAnsi) { @@ -215,10 +208,9 @@ class DagrCoreArgs( this.replayLog.foreach { log => executor.withReporter(TaskCache(log)) } + } - // execute - val exitCode = executor.execute(pipeline) - + protected def executeFinish(executor: Executor, report: FilePath): Unit = { // Write out the execution report if (!interactive || Io.StdOut != report) { val pw = new PrintWriter(Io.toWriter(report)) @@ -226,11 +218,32 @@ class DagrCoreArgs( pw.close() } + } + + /** + * Attempts to setup the various directories needed to executed the pipeline, execute it, and generate + * an execution report. + */ + protected[cmdline] def execute(pipeline : Pipeline)(implicit ex: ExecutionContext): Int = { + val report = this.reportPath.getOrElse(throw new IllegalStateException("execute() called before configure()")) + + // Get the executor + val executor = this.executor.getOrElse(throw new IllegalStateException("Executor was not configured, did you all configure()")) + + // Set up any task prior to execution + executeSetup(executor, report) + + // execute + val exitCode = executor.execute(pipeline) + + // complete any shutdown tasks after execution + executeFinish(executor, report) + exitCode } } -class DagrCoreMain extends LazyLogging { +class DagrCoreMain[Args<:DagrCoreArgs:TypeTag:ClassTag] extends LazyLogging { protected def name: String = "dagr" /** A main method that invokes System.exit with the exit code. */ @@ -250,7 +263,7 @@ class DagrCoreMain extends LazyLogging { val startTime = System.currentTimeMillis() val packages = Sopt.find[Pipeline](packageList, includeHidden=includeHidden) - val exit = Sopt.parseCommandAndSubCommand[DagrCoreArgs,Pipeline](name, args, packages) match { + val exit = Sopt.parseCommandAndSubCommand[Args,Pipeline](name, args, packages) match { case Sopt.Failure(usage) => System.err.print(usage()) 1 diff --git a/core/src/main/scala/dagr/core/exec/Executor.scala b/core/src/main/scala/dagr/core/exec/Executor.scala index 3757e89b..ba34d6f3 100644 --- a/core/src/main/scala/dagr/core/exec/Executor.scala +++ b/core/src/main/scala/dagr/core/exec/Executor.scala @@ -109,7 +109,7 @@ trait Executor extends FinalStatusReporter { } /** Returns the task status by ordinal */ - def from(ordinal: Int): TaskStatus + def statusFrom(ordinal: Int): TaskStatus /** Returns the log directory. */ def logDir: DirPath diff --git a/core/src/main/scala/dagr/core/exec/TaskCache.scala b/core/src/main/scala/dagr/core/exec/TaskCache.scala index f48774f8..8c705a0e 100644 --- a/core/src/main/scala/dagr/core/exec/TaskCache.scala +++ b/core/src/main/scala/dagr/core/exec/TaskCache.scala @@ -305,7 +305,7 @@ class SimpleTaskCache(replayLogLines: Seq[String], source: Option[String] = None require(task._executor.isDefined, s"Executor not defined for task '${task.name}'") this.statuses.filter { status => status.definitionCode == previousDefinition.code } .sortBy(-_.statusOrdinal) - .map(status => task._executor.map(_.from(status.statusOrdinal).success).getOrElse(unreachable(s"Executor not set for task '${task.name}'"))) + .map(status => task._executor.map(_.statusFrom(status.statusOrdinal).success).getOrElse(unreachable(s"Executor not set for task '${task.name}'"))) .headOption match { case Some(s) if s => Unit // if it previously succeeded, don't set it to execute! case _ => setToExecute(task) diff --git a/core/src/main/scala/dagr/core/execsystem/TaskManagerLike.scala b/core/src/main/scala/dagr/core/execsystem/TaskManagerLike.scala index bb362ff1..6f00937e 100644 --- a/core/src/main/scala/dagr/core/execsystem/TaskManagerLike.scala +++ b/core/src/main/scala/dagr/core/execsystem/TaskManagerLike.scala @@ -162,5 +162,5 @@ private[execsystem] trait TaskManagerLike extends Executor { def stepExecution(): (Traversable[Task], Traversable[Task], Traversable[Task], Traversable[Task]) /** Returns the task status by ordinal */ - final def from(ordinal: Int): TaskStatus = TaskStatus.withValue(ordinal) + final def statusFrom(ordinal: Int): TaskStatus = TaskStatus.withValue(ordinal) } diff --git a/core/src/main/scala/dagr/core/execsystem2/GraphExecutor.scala b/core/src/main/scala/dagr/core/execsystem2/GraphExecutor.scala index 1a73921b..d4571940 100644 --- a/core/src/main/scala/dagr/core/execsystem2/GraphExecutor.scala +++ b/core/src/main/scala/dagr/core/execsystem2/GraphExecutor.scala @@ -55,7 +55,7 @@ trait GraphExecutor[T<:Task] extends Executor { def resources: Option[SystemResources] = Some(this.taskExecutor.resources) /** Returns the task status by ordinal */ - final def from(ordinal: Int): TaskStatus = TaskStatus.withValue(ordinal) + final def statusFrom(ordinal: Int): TaskStatus = TaskStatus.withValue(ordinal) /** Returns the executor that execute tasks. */ protected def taskExecutor: TaskExecutor[T] diff --git a/core/src/main/scala/dagr/core/reporting/ExecutionLogger.scala b/core/src/main/scala/dagr/core/reporting/ExecutionLogger.scala index 0589f9af..f3201434 100644 --- a/core/src/main/scala/dagr/core/reporting/ExecutionLogger.scala +++ b/core/src/main/scala/dagr/core/reporting/ExecutionLogger.scala @@ -30,7 +30,7 @@ import com.fulcrumgenomics.commons.io.Io import dagr.core.reporting.ExecutionLogger.{Definition, Relationship, Status} import dagr.core.reporting.ReportingDef._ import dagr.core.tasksystem.Task -import dagr.core.tasksystem.Task.{TaskStatus, TaskInfo => RootTaskInfo} +import dagr.core.tasksystem.Task.{TaskInfoLike, TaskStatus, TaskInfo => RootTaskInfo} import scala.collection.mutable @@ -167,7 +167,7 @@ class ExecutionLogger(log: FilePath) extends TaskLogger with TaskRegister with A } /** The method that will be called with updated task information. */ - def record(info: RootTaskInfo): Unit = logStatusChange(info) + def record(info: TaskInfoLike): Unit = logStatusChange(info) def close(): Unit = if (!this.closed) { this.writer.flush() @@ -203,7 +203,7 @@ class ExecutionLogger(log: FilePath) extends TaskLogger with TaskRegister with A } /** Logs the a status update for the task. */ - private def logStatusChange(rootInfo: RootTaskInfo): Unit = { + private def logStatusChange(rootInfo: TaskInfoLike): Unit = { val definition = this.taskToDefinition(rootInfo.task) write(Status(rootInfo.status, definition).toString + "\n") } diff --git a/core/src/main/scala/dagr/core/reporting/ReportingDef.scala b/core/src/main/scala/dagr/core/reporting/ReportingDef.scala index 361589ce..ce25acdd 100644 --- a/core/src/main/scala/dagr/core/reporting/ReportingDef.scala +++ b/core/src/main/scala/dagr/core/reporting/ReportingDef.scala @@ -26,7 +26,7 @@ package dagr.core.reporting import dagr.core.tasksystem.Task -import dagr.core.tasksystem.Task.TaskInfo +import dagr.core.tasksystem.Task.TaskInfoLike object ReportingDef { @@ -36,7 +36,7 @@ object ReportingDef { /** Base trait for all classes interested in when the task status changes for any task. */ trait TaskLogger extends TaskReporter { /** The method that will be called with updated task information. */ - def record(info: TaskInfo): Unit + def record(info: TaskInfoLike): Unit } /** Base trait for all classes interested in when a new task is built by another task (ex. diff --git a/core/src/main/scala/dagr/core/reporting/TaskStatusLogger.scala b/core/src/main/scala/dagr/core/reporting/TaskStatusLogger.scala index 86e9a4f9..40262119 100644 --- a/core/src/main/scala/dagr/core/reporting/TaskStatusLogger.scala +++ b/core/src/main/scala/dagr/core/reporting/TaskStatusLogger.scala @@ -27,7 +27,7 @@ package dagr.core.reporting import com.fulcrumgenomics.commons.util.{LazyLogging, Logger} import dagr.core.reporting.ReportingDef.TaskLogger -import dagr.core.tasksystem.Task.{TaskInfo => RootTaskInfo} +import dagr.core.tasksystem.Task.{TaskInfoLike, TaskInfo => RootTaskInfo} /** A simple logger that delegates to [[dagr.core.tasksystem.Task.TaskInfo#logTaskMessage]]. */ class TaskStatusLogger extends TaskLogger { @@ -36,5 +36,5 @@ class TaskStatusLogger extends TaskLogger { override lazy val logger: Logger = new Logger(getClass) } private val logger = new Dagr().logger - def record(info: RootTaskInfo): Unit = info.logTaskMessage(this.logger) + def record(info: TaskInfoLike): Unit = info.logTaskMessage(this.logger) } \ No newline at end of file diff --git a/core/src/main/scala/dagr/core/reporting/TopLikeStatusReporter.scala b/core/src/main/scala/dagr/core/reporting/TopLikeStatusReporter.scala index d14cbda6..d711e830 100644 --- a/core/src/main/scala/dagr/core/reporting/TopLikeStatusReporter.scala +++ b/core/src/main/scala/dagr/core/reporting/TopLikeStatusReporter.scala @@ -36,7 +36,7 @@ import dagr.core.exec.{Executor, SystemResources} import dagr.core.execsystem.TaskManager import dagr.core.execsystem2.GraphExecutor import dagr.core.reporting.ReportingDef.TaskLogger -import dagr.core.tasksystem.Task.TaskInfo +import dagr.core.tasksystem.Task.{TaskInfo, TaskInfoLike} import dagr.core.tasksystem.{InJvmTask, ProcessTask, Task, UnitTask} import scala.collection.mutable @@ -318,7 +318,7 @@ trait TopLikeStatusReporter extends TaskLogger with Terminal { } /** This method is called when any info about a task is updated. */ - final def record(info: TaskInfo): Unit = { + final def record(info: TaskInfoLike): Unit = { // add the task to the set of known tasks this._tasks += info.task // refresh the screen diff --git a/core/src/main/scala/dagr/core/tasksystem/Task.scala b/core/src/main/scala/dagr/core/tasksystem/Task.scala index bad26d7e..6c4d8594 100644 --- a/core/src/main/scala/dagr/core/tasksystem/Task.scala +++ b/core/src/main/scala/dagr/core/tasksystem/Task.scala @@ -55,8 +55,59 @@ object Task { override def toString: String = this.description } + /** The execution information for a task. Used for extrenal read-only access to [[TaskInfo]]. Any execution system + * should extend [[TaskInfo]] instead class to store their specific metadata. */ + trait TaskInfoLike extends Ordered[TaskInfoLike] { + def task : Task + def id : Option[TaskId] + def attempts : Int + def script : Option[FilePath] + def log : Option[FilePath] + def resources : Option[ResourceSet] + def exitCode : Option[Int] + def throwable : Option[Throwable] + + /** The current status of the task. */ + def status : TaskStatus + + /** The instant the task reached a given status. */ + def timePoints : Traversable[TimePoint] + + /** The instant the task reached the current status. */ + def statusTime : Instant + + private[core] def logTaskMessage(logger: Logger) + + def compare(that: TaskInfoLike): Int = { + (this.id, that.id) match { + case (Some(_), None) => -1 + case (None, Some(_)) => 1 + case (None, None) => this.status.ordinal - that.status.ordinal + case (Some(l), Some(r)) => (l - r).toInt + } + } + } + + object TimePoint { + def parse(s: String, f: Int => TaskStatus): TimePoint = { + s.split(',').toList match { + case _ :: ordinal :: instant :: _ => + TimePoint( + status = f(ordinal.toInt), + instant = Instant.parse(instant) + ) + case _ => + throw new IllegalArgumentException(s"Could not parse TimePoint '$s'") + } + } + } + /** A tuple representing the instant the task was set to the given status. */ - private[core] case class TimePoint(status: TaskStatus, instant: Instant) + case class TimePoint(status: TaskStatus, instant: Instant) { + override def toString: String = { + s"${this.status.name},${this.status.ordinal},${this.instant.toString},${this.status.description}" + } + } /** Execution information associated with a task. Any execution system should extend this class to store * their specific metadata. @@ -90,7 +141,7 @@ object Task { var resources : Option[ResourceSet] = None, var exitCode : Option[Int] = None, var throwable : Option[Throwable] = None - ) { + ) extends TaskInfoLike { if (attempts < 1) throw new RuntimeException("attempts must be greater than zero") @@ -295,7 +346,7 @@ trait Task extends Dependable { /** The execution information about this task, or None if not being executed. */ private[core] var _taskInfo : Option[TaskInfo] = None - private[core] def taskInfo : TaskInfo = this._taskInfo.get + private[dagr] def taskInfo : TaskInfo = this._taskInfo.get private[core] def taskInfo_=(info: TaskInfo) = { this._taskInfo = Some(info) this._executor.foreach(_.record(info)) @@ -316,10 +367,10 @@ trait Task extends Dependable { private val dependedOnByTasks = new ListBuffer[Task]() /** Gets the sequence of tasks that this task depends on.. */ - protected[core] def tasksDependedOn: Traversable[Task] = this.dependsOnTasks.toList + protected[dagr] def tasksDependedOn: Traversable[Task] = this.dependsOnTasks.toList /** Gets the sequence of tasks that depend on this task. */ - protected[core] def tasksDependingOnThisTask: Traversable[Task] = this.dependedOnByTasks.toList + protected[dagr] def tasksDependingOnThisTask: Traversable[Task] = this.dependedOnByTasks.toList /** Must be implemented to handle the addition of a dependent. */ override def addDependent(dependent: Dependable): Unit = dependent.headTasks.foreach(t => { diff --git a/core/src/test/scala/dagr/core/exec/SimpleTaskCacheTest.scala b/core/src/test/scala/dagr/core/exec/SimpleTaskCacheTest.scala index db0b10c4..ad6af88b 100644 --- a/core/src/test/scala/dagr/core/exec/SimpleTaskCacheTest.scala +++ b/core/src/test/scala/dagr/core/exec/SimpleTaskCacheTest.scala @@ -43,10 +43,10 @@ class SimpleTaskCacheTest extends FutureUnitSpec { private val executor: Executor = Executor(experimentalExecution=true, resources=SystemResources.infinite) /** The status with the lowest ordinal that does not represent a successful execution status. */ - private val failStatus: TaskStatus = Stream.from(0).map(i => executor.from(i)).find(!_.success).getOrElse(unreachable("No unsuccessful status found")) + private val failStatus: TaskStatus = Stream.from(0).map(i => executor.statusFrom(i)).find(!_.success).getOrElse(unreachable("No unsuccessful status found")) /** The status with the lowest ordinal that represents a successful execution status. */ - private val successfulStatus: TaskStatus = Stream.from(0).map(i => executor.from(i)).find(_.success).getOrElse(unreachable("No successful status found")) + private val successfulStatus: TaskStatus = Stream.from(0).map(i => executor.statusFrom(i)).find(_.success).getOrElse(unreachable("No successful status found")) /** A simple implicit to set the default executor */ implicit private class WithExecutor[T <: Task](task: T) { @@ -95,7 +95,7 @@ class SimpleTaskCacheTest extends FutureUnitSpec { it should "fail if a task definition is not found for a status" in { val definition = Definition.buildRootDefinition(new NoOpTask) - val status = Status(executor.from(0), definition) + val status = Status(executor.statusFrom(0), definition) val lines = Seq(status).map(_.toString) val exception = intercept[Exception] { new SimpleTaskCache(lines) } exception.getMessage should include (s"missing a definition for task '${definition.code}' with status '$status'") diff --git a/core/src/test/scala/dagr/core/execsystem2/GraphExecutorTest.scala b/core/src/test/scala/dagr/core/execsystem2/GraphExecutorTest.scala index d4fc04b4..51ca2b91 100644 --- a/core/src/test/scala/dagr/core/execsystem2/GraphExecutorTest.scala +++ b/core/src/test/scala/dagr/core/execsystem2/GraphExecutorTest.scala @@ -457,7 +457,7 @@ class GraphExecutorTest extends GraphExecutorUnitSpec { "GraphExecute.from" should "return the task status from the ordinal" in { val graphExecutor = this.graphExecutor TaskStatus.values.foreach { status => - graphExecutor.from(status.ordinal) shouldBe status + graphExecutor.statusFrom(status.ordinal) shouldBe status } } diff --git a/src/main/scala/dagr/cmdline/DagrMain.scala b/src/main/scala/dagr/cmdline/DagrMain.scala index 830a1160..7e8eae0b 100644 --- a/src/main/scala/dagr/cmdline/DagrMain.scala +++ b/src/main/scala/dagr/cmdline/DagrMain.scala @@ -31,8 +31,5 @@ object DagrMain { protected def getPackageList: List[String] = List[String]("dagr") /** The main method */ - def main(args: Array[String]): Unit = { - import scala.concurrent.ExecutionContext.Implicits.global - System.exit(new DagrCoreMain().makeItSo(args, packageList = getPackageList)) - } + def main(args: Array[String]): Unit = DagrCoreMain.main(args) } diff --git a/webservice/README.md b/webservice/README.md new file mode 100644 index 00000000..35c010cc --- /dev/null +++ b/webservice/README.md @@ -0,0 +1,14 @@ +# Dagr Web Service API + +## Developer Notes + +This document describes how to add a route to the web-service API. This is so we don't forget how to do it in the future. + +1. Add a `case class` to store the response data to `ApiDataModels.scala`. +2. Any custom support of reading and writing the JSON for data stored in the previous response `case class` to `DagrApiJsonSupport ` +3. Add a `lazy val` to `DagrApiJsonSupport` to identify the protocol to use for JSON support. +4. Add a `case class` to `DagrApiHandler` that stores any information used to handle the request. +5. In the `receive` method in `DagrApiHandler`, add a `case` to match the request `case class` you just created in the previous step, then do any magic, and finally create a response `class` as described in the first step. +6. Define a new method called `*Route` in `DagrApiService` and add it to the list of routes (`routes`). + +If you get any error about marshalling, you probably did not perform step 3. \ No newline at end of file diff --git a/webservice/build.sbt b/webservice/build.sbt new file mode 100644 index 00000000..077b6c18 --- /dev/null +++ b/webservice/build.sbt @@ -0,0 +1,2 @@ +assemblyJarName in assembly := "dagr-webservice-" + version.value + ".jar" +mainClass in assembly := Some("dagr.webservice.DagrServerMain") diff --git a/webservice/src/main/resources/example.webservice.conf b/webservice/src/main/resources/example.webservice.conf new file mode 100644 index 00000000..b290cecc --- /dev/null +++ b/webservice/src/main/resources/example.webservice.conf @@ -0,0 +1,40 @@ +# Example configuration file for DAGR that contains all possible keys +# for the default set of tasks. This should be edited for your local +# setup. + +# DAGR Configuration - this section can be omited entirely from application configuration +# files as dagr's reference.conf will provide sensible defaults where they are needed. +dagr = { + command-line-name = "dagr" + path = ${PATH} // tell dagr to search for executables using the system path by default + # script-directory = ??? // path to a directory to store scripts during pipeline execution + # log-directory = ??? // path to a directory to store the logs generated during pipeline execution + # system-cores = ??? // how many cores can dagr use? defaults to the cores on the current host + # system-memory = ??? // how much memory can dagr use? defaults to 90% of memory on current host + # colors-status = ??? // true to print the command line with color, false otherwise + # webservice-host = ??? // the host name (ex. 0.0.0.0) on which the web-service should run + # webservice-port = ??? // the port for the web-service +} + +# Task Configuration +pipeline_dir = /pipeline/packages +picard.jar = ${pipeline_dir}/picard.jar +fgbio.jar = ${pipeline_dir}/fgbio.jar +jeanluc.jar = ${pipeline_dir}/jeanluc.jar +gatk.jar = ${pipeline_dir}/GenomeAnalysisTK.jar +varscan2.jar = ${pipeline_dir}/varscan2.jar +mutect1.jar = ${pipeline_dir}/SomaticAnalysisTK.jar +bwa.executable = ${pipeline_dir}/bwa/bwa +bwa-kit.dir = ${pipeline_dir}/bwa.kit +tabix.executable = ${pipeline_dir}/tabix +verifybamid.executable = ${pipeline_dir}/verifyBamID +samtools.executable = ${pipeline_dir}/samtools/samtools + +# FreeBayes Configuration +freebayes.executable = ${pipeline_dir}/freebayes // required for FreeBayes +freebayes.scripts = ${pipeline_dir}/freebayes/scripts // required for FreeBayes +bgzip.bin = ${pipeline_dir}/htslib // required for FreeBayes +vcflib.bin = ${pipeline_dir}/vcflib/bin // required for filter FreeBayes calls +vcflib.scripts = ${pipeline_dir}/vcflib/scripts // required for filter FreeBayes calls +vt.bin = ${pipeline_dir}/vt/bin // required for filter FreeBayes calls +bcftools.bin = ${pipeline_dir}/bcftools // required for filter FreeBayes calls diff --git a/webservice/src/main/scala/dagr/webservice/ApiDataModels.scala b/webservice/src/main/scala/dagr/webservice/ApiDataModels.scala new file mode 100644 index 00000000..4fbd27ba --- /dev/null +++ b/webservice/src/main/scala/dagr/webservice/ApiDataModels.scala @@ -0,0 +1,43 @@ +/* + * The MIT License + * + * Copyright (c) 2016 Fulcrum Genomics LLC + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +package dagr.webservice + +import dagr.core.tasksystem.Task.TaskInfoLike +import dagr.tasks.DagrDef.FilePath + +/** Stores the data to be returned by an end-point. Make sure that there exists a protocol and any custom JSON + * handling specified in [[DagrApiJsonSupport]]. + */ +sealed abstract class DagrResponse + +case class DagrVersionResponse(id: String) extends DagrResponse + +case class DagrStatusResponse(infos: Iterable[TaskInfoLike]) extends DagrResponse + +case class DagrTaskScriptResponse(script: Option[FilePath]) extends DagrResponse + +case class DagrTaskLogResponse(log: Option[FilePath]) extends DagrResponse + +case class DagrTaskInfoResponse(info: TaskInfoLike) extends DagrResponse diff --git a/webservice/src/main/scala/dagr/webservice/Configuration.scala b/webservice/src/main/scala/dagr/webservice/Configuration.scala new file mode 100644 index 00000000..5a936fb1 --- /dev/null +++ b/webservice/src/main/scala/dagr/webservice/Configuration.scala @@ -0,0 +1,35 @@ +/* + * The MIT License + * + * Copyright (c) 2017 Fulcrum Genomics LLC + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + * + */ + +package dagr.webservice + +object Configuration extends dagr.core.config.Configuration { + + // Keys for configuration values used in dagr webservice + object Keys { + val WebServiceHost = "dagr.webservice-host" + val WebSErvicePort = "dagr.webservice-port" + } +} diff --git a/webservice/src/main/scala/dagr/webservice/DagrApiHandler.scala b/webservice/src/main/scala/dagr/webservice/DagrApiHandler.scala new file mode 100644 index 00000000..bab057d0 --- /dev/null +++ b/webservice/src/main/scala/dagr/webservice/DagrApiHandler.scala @@ -0,0 +1,83 @@ +/* + * The MIT License + * + * Copyright (c) 2016 Fulcrum Genomics LLC + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +package dagr.webservice + +import akka.actor.{Actor, ActorSystem, Props} +import akka.util.Timeout +import com.fulcrumgenomics.commons.util.LazyLogging +import dagr.core.DagrDef._ +import dagr.core.tasksystem.Task.TaskInfoLike +import dagr.webservice.PerRequest.RequestComplete +import spray.http.StatusCodes +import spray.httpx.SprayJsonSupport._ + +import scala.concurrent.duration._ + +/** This object stores the definitions for the API requests. */ +object DagrApiHandler { + def props(taskInfoTracker: TaskInfoTracker): Props = { + Props(new DagrApiHandler(taskInfoTracker)) + } + + sealed trait DagrRequest + final case class DagrVersionRequest() extends DagrRequest + final case class DagrStatusRequest() extends DagrRequest + final case class DagrTaskScriptRequest(id: TaskId) extends DagrRequest + final case class DagrTaskLogRequest(id: TaskId) extends DagrRequest + final case class DagrTaskInfoRequest(id: TaskId) extends DagrRequest +} + +/** Receives a request, performs the appropriate logic, and sends back a response */ +class DagrApiHandler(val taskInfoTracker: TaskInfoTracker) extends Actor with DagrApiJsonSupport with LazyLogging { + // needed for marshalling + import DagrApiHandler._ + + implicit val timeout: Timeout = Timeout(2.seconds) + implicit val system: ActorSystem = context.system + + override def receive: PartialFunction[Any, Unit] = { + case DagrVersionRequest() => + context.parent ! RequestComplete(StatusCodes.OK, DagrVersionResponse(DagrApiService.version)) + case DagrStatusRequest() => + context.parent ! RequestComplete(StatusCodes.OK, DagrStatusResponse(taskInfoTracker.infos)) + case DagrTaskScriptRequest(id) => + applyTaskInfo(id, info => context.parent ! RequestComplete(StatusCodes.OK, DagrTaskScriptResponse(info.script))) + case DagrTaskLogRequest(id) => + applyTaskInfo(id, info => context.parent ! RequestComplete(StatusCodes.OK, DagrTaskLogResponse(info.log))) + case DagrTaskInfoRequest(id) => + applyTaskInfo(id, info => context.parent ! RequestComplete(StatusCodes.OK, DagrTaskInfoResponse(info))) + } + + /** Handles the case that the specific task identifier does not exist and sends back a bad request message, otherwise, + * it applies the given method. + */ + private def applyTaskInfo(id: TaskId, f: (TaskInfoLike => Unit)): Unit = { + taskInfoTracker.info(id) match { + case Some(info) => f(info) + case None => context.parent ! RequestComplete(StatusCodes.NotFound, s"Task with id '$id' not found") + } + } +} + diff --git a/webservice/src/main/scala/dagr/webservice/DagrApiJsonSupport.scala b/webservice/src/main/scala/dagr/webservice/DagrApiJsonSupport.scala new file mode 100644 index 00000000..74ea7968 --- /dev/null +++ b/webservice/src/main/scala/dagr/webservice/DagrApiJsonSupport.scala @@ -0,0 +1,162 @@ +/* + * The MIT License + * + * Copyright (c) 2016 Fulcrum Genomics LLC + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +package dagr.webservice + +import java.nio.file.{Path, Paths} +import java.time.Instant + +import com.sun.xml.internal.ws.encoding.soap.DeserializationException +import dagr.core.DagrDef._ +import dagr.core.exec.{Cores, Memory, ResourceSet} +import dagr.core.tasksystem.Task.{TaskInfoLike, TaskStatus, TimePoint} +import spray.json._ + +import scala.language.implicitConversions + +/** Methods for formatting custom types in JSON. */ +trait DagrApiJsonSupport extends DefaultJsonProtocol { + implicit val dagrVersionResponseProtocol: RootJsonFormat[DagrVersionResponse] = jsonFormat1(DagrVersionResponse) + implicit val dagrStatusResponseProtocol: RootJsonFormat[DagrStatusResponse] = jsonFormat1(DagrStatusResponse) + implicit val dagrTaskScriptResponseProtocol: RootJsonFormat[DagrTaskScriptResponse] = jsonFormat1(DagrTaskScriptResponse) + implicit val dagrTaskLogResponseProtocol: RootJsonFormat[DagrTaskLogResponse] = jsonFormat1(DagrTaskLogResponse) + implicit val dagrTaskInfoResponseProtocol: RootJsonFormat[DagrTaskInfoResponse] = jsonFormat1(DagrTaskInfoResponse) + + def taskInfoTracker: TaskInfoTracker + + implicit object TaskStatusFormat extends RootJsonFormat[TaskStatus] { + override def write(status: TaskStatus) = JsString(status.ordinal.toString) + + override def read(json: JsValue): TaskStatus = json match { + case JsString(value) => taskInfoTracker.statusFrom(value.toInt) + case _ => throw new DeserializationException("only string supported") + } + } + + implicit object PathFormat extends RootJsonFormat[Path] { + override def write(path: Path) = JsString(path.toString) + + override def read(json: JsValue): Path = json match { + case JsString(value) => Paths.get(value) + case _ => throw new DeserializationException("only string supported") + } + } + + implicit object ResoureSetFormat extends RootJsonFormat[ResourceSet] { + override def write(resourceSet: ResourceSet): JsObject = { + var map = Map.empty[String, JsValue] + map += ("cores" -> JsString(resourceSet.cores.toString)) + map += ("memory" -> JsString(resourceSet.memory.prettyString)) + JsObject(map) + } + + override def read(json: JsValue): ResourceSet = { + val jsObject = json.asJsObject + ResourceSet( + cores = Cores(jsObject.fields("cores").convertTo[Float]), + memory = Memory(jsObject.fields("memory").convertTo[String]) + ) + } + } + + implicit object InstantFormat extends RootJsonFormat[Instant] { + override def write(timestamp: Instant) = JsString(timestamp.toString) + + override def read(json: JsValue): Instant = json match { + case JsString(value) => Instant.parse(value) + case _ => throw new DeserializationException("only string supported") + } + } + + implicit object TimePointFormat extends RootJsonFormat[TimePoint] { + override def write(timePoint: TimePoint) = JsString(timePoint.toString) + + override def read(json: JsValue): TimePoint = json match { + case JsString(value) => TimePoint.parse(value, taskInfoTracker.statusFrom) + case _ => throw new DeserializationException("only string supported") + } + } + + implicit object TaskInfoLikeFormat extends RootJsonFormat[TaskInfoLike] { + override def write(info: TaskInfoLike): JsObject = { + var map = Map.empty[String, JsValue] + val dependsOn = info.task.tasksDependedOn.map { d => + s"${d.taskInfo.id.getOrElse("None")},${d.name},${d.taskInfo.status.name},${d.taskInfo.status.description}" + } + val dependents = info.task.tasksDependingOnThisTask.map { d => + s"${d.taskInfo.id.getOrElse("None")},${d.name},${d.taskInfo.status.name},${d.taskInfo.status.description}" + } + + map += ("name" -> JsString(info.task.name)) + info.id.foreach { id => map += ("id" -> id.toJson) } + map += ("attempts" -> info.attempts.toJson) + info.script.foreach { script => map += ("script" -> script.toJson) } + info.log.foreach { log => map += ("log" -> log.toJson) } + info.resources.foreach { r => map += ("resources" -> r.toJson) } + info.exitCode.foreach { e => map += ("exit_code" -> e.toJson) } + info.throwable.foreach { t => map += ("throwable" -> t.getMessage.toJson) } + map += ("status" -> info.status.ordinal.toJson) + map += ("time_points" -> info.timePoints.toList.toJson) + map += ("depends_on" -> dependsOn.toList.toJson) + map += ("dependents" -> dependents.toList.toJson) + + JsObject(map) + } + + override def read(json: JsValue): TaskInfoLike = { + val jsObject = json.asJsObject + + val name = jsObject.fields("name").convertTo[String] + val taskId = jsObject.fields("id").convertTo[TaskId] + /* + val attempts = jsObject.fields("attempts").convertTo[Int] + val script = pathOrNone(jsObject, "script") + val log = pathOrNone(jsObject, "log") + val resources = jsObject.fields("resources").convertTo[ResourceSet] + val exitCode = jsObject.fields("exit_code").convertTo[Int] + val throwable = new Throwable(jsObject.fields("throwable").convertTo[String]) // TODO: get the original class? + val status = jsObject.fields("status").convertTo[Int] // NB: the ordinal + val timePoints = jsObject.fields("time_points").convertTo[Traversable[TimePoint]] + */ + + val info = taskInfoTracker.info(taskId).getOrElse { + throw new IllegalArgumentException(s"Could not retrieve task with id '$taskId'") + } + + if (info.task.name.compareTo(name) != 0) throw new IllegalStateException(s"Task names differ! '$name' != '${info.task.name}'") + + info + } + } + + /* + private def pathOrNone(json: JsObject, key: String): Option[FilePath] = { + if (json.getFields(key).nonEmpty) json.fields(key).convertTo[Option[FilePath]] else None + } + + private def instantOrNone(json: JsObject, key: String): Option[Instant] = { + if (json.getFields(key).nonEmpty) json.fields(key).convertTo[Option[Instant]] else None + } + */ +} \ No newline at end of file diff --git a/webservice/src/main/scala/dagr/webservice/DagrApiService.scala b/webservice/src/main/scala/dagr/webservice/DagrApiService.scala new file mode 100644 index 00000000..01b5799a --- /dev/null +++ b/webservice/src/main/scala/dagr/webservice/DagrApiService.scala @@ -0,0 +1,133 @@ +/* + * The MIT License + * + * Copyright (c) 2016 Fulcrum Genomics LLC + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +package dagr.webservice + +import akka.actor.{Actor, ActorContext, Props} +import com.fulcrumgenomics.commons.util.LazyLogging +import dagr.core.DagrDef.TaskId +import spray.http.StatusCodes +import spray.http.StatusCodes._ +import spray.routing._ +import spray.util.LoggingContext + +import scala.concurrent.ExecutionContextExecutor +import scala.util.{Failure, Success, Try} + +object DagrApiServiceActor { + def props(taskInfoTracker: TaskInfoTracker): Props = Props(classOf[DagrApiServiceActor], taskInfoTracker) +} + +/** The main actor for the API service */ +class DagrApiServiceActor(taskInfoTracker: TaskInfoTracker) extends HttpServiceActor with LazyLogging { + implicit val executionContext: ExecutionContextExecutor = actorRefFactory.dispatcher + + trait ActorRefFactoryContext { + def actorRefFactory: ActorContext = context + } + override def actorRefFactory: ActorContext = context + + val dagrService: DagrApiService = new DagrApiService(taskInfoTracker) with ActorRefFactoryContext + val possibleRoutes: Route = options{ complete(OK) } ~ dagrService.routes + + def receive: Actor.Receive = runRoute(possibleRoutes) + + implicit def routeExceptionHandler(implicit log: LoggingContext) = + ExceptionHandler { + case e: IllegalArgumentException => complete(BadRequest, e.getMessage) + case ex: Throwable => + logger.error(ex.getMessage) + complete(InternalServerError, s"Something went wrong, but the error is unspecified.") + } + + implicit val routeRejectionHandler = + RejectionHandler { + case MalformedRequestContentRejection(message, cause) :: _ => complete(BadRequest, message) + } +} + +object DagrApiService { + val version: String = "v1" // TODO: match versions + val root: String = "service" +} + +/** Defines the possible routes for the Dagr service */ +abstract class DagrApiService(taskInfoTracker: TaskInfoTracker) extends HttpService with PerRequestCreator { + import DagrApiService._ + + def routes: Route = versionRoute ~ taskScriptRoute ~ taskLogRoute ~ infoRoute + + def versionRoute: Route = { + path(root / "version") { + pathEnd { + get { + requestContext => perRequest(requestContext, DagrApiHandler.props(taskInfoTracker), DagrApiHandler.DagrVersionRequest()) + } + } + } + } + + def taskScriptRoute: Route = { + path(root / version / "script" / IntNumber) { (id) => + get { + Try(TaskId(id)) match { + case Success(taskId) => + requestContext => perRequest(requestContext, DagrApiHandler.props(taskInfoTracker), DagrApiHandler.DagrTaskScriptRequest(taskId)) + case Failure(ex) => complete(StatusCodes.BadRequest) + } + } + } + } + + def taskLogRoute: Route = { + path(root / version / "log" / IntNumber) { (id) => + get { + Try(TaskId(id)) match { + case Success(taskId) => + requestContext => perRequest(requestContext, DagrApiHandler.props(taskInfoTracker), DagrApiHandler.DagrTaskLogRequest(taskId)) + case Failure(ex) => complete(StatusCodes.BadRequest) + } + } + } + } + + def infoRoute: Route = { + pathPrefix(root / version / "info") { + pathEnd { + get { + requestContext => perRequest(requestContext, DagrApiHandler.props(taskInfoTracker), DagrApiHandler.DagrStatusRequest()) + } + } ~ + pathPrefix(IntNumber) { (id) => + get { + Try(TaskId(id)) match { + case Success(taskId) => + requestContext => perRequest(requestContext, DagrApiHandler.props(taskInfoTracker), DagrApiHandler.DagrTaskInfoRequest(taskId)) + case Failure(ex) => complete(StatusCodes.BadRequest) + } + } + } + } + } +} \ No newline at end of file diff --git a/webservice/src/main/scala/dagr/webservice/DagrServer.scala b/webservice/src/main/scala/dagr/webservice/DagrServer.scala new file mode 100644 index 00000000..da9e0489 --- /dev/null +++ b/webservice/src/main/scala/dagr/webservice/DagrServer.scala @@ -0,0 +1,122 @@ +/* + * The MIT License + * + * Copyright (c) 2016 Fulcrum Genomics LLC + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +package dagr.webservice + +import akka.actor.ActorSystem +import akka.io.IO +import akka.io.Tcp.CommandFailed +import akka.pattern._ +import akka.util.Timeout +import com.fulcrumgenomics.commons.util.LazyLogging +import dagr.core.DagrDef.TaskId +import dagr.core.exec.Executor +import dagr.core.reporting.ReportingDef.TaskLogger +import dagr.core.tasksystem.Task.{TaskInfoLike, TaskStatus} +import spray.can.Http + +import scala.collection.mutable +import scala.concurrent.Await +import scala.concurrent.duration._ +import scala.util.Try + + +/** A little class to store all the infos created by the execution system. */ +class TaskInfoTracker(executor: Executor) extends TaskLogger { + private val _infos = mutable.TreeSet[TaskInfoLike]() + private val _idToInfo = mutable.HashMap[TaskId,TaskInfoLike]() + + /** Returns the task status by ordinal */ + def statusFrom(ordinal: Int): TaskStatus = executor.statusFrom(ordinal) + + def record(info: TaskInfoLike): Unit = { + // Add the info (only once) + if (!_infos.contains(info)) _infos += info + // Add an entry only when the id is defined + info.id.foreach { id => + if (!this._idToInfo.contains(id)) this._idToInfo(id) = info + } + } + + def info(id: TaskId): Option[TaskInfoLike] = this._idToInfo.get(id) + + def infos: Iterable[TaskInfoLike] = this._infos + + // TODO: different methods of getting the info... +} + +/** Dagr web service */ +class DagrServer(executor: Executor, host: Option[String], port: Option[Int]) extends dagr.core.config.Configuration with LazyLogging { + implicit val actorSystem = ActorSystem("dagr") + + private val taskInfoTracker = new TaskInfoTracker(executor) + + private val _host = host.getOrElse { + optionallyConfigure[String](Configuration.Keys.WebServiceHost).getOrElse("0.0.0.0") + } + + private val _port = port.getOrElse { + optionallyConfigure[String](Configuration.Keys.WebSErvicePort).getOrElse("8080").toInt + } + + /** The logger to track all the task infos. */ + def taskLogger: TaskLogger = this.taskInfoTracker + + def startAllServices() { + startWebServiceActors() + } + + def stopAllServices() { + stopAndCatchExceptions(stopWebServiceActors()) + } + + private def startWebServiceActors() = { + implicit val bindTimeout: Timeout = 120.seconds + val service = actorSystem.actorOf(DagrApiServiceActor.props(taskInfoTracker), "dagr-actor") + Await.result(IO(Http) ? Http.Bind(service, interface = this._host, port = this._port), bindTimeout.duration) match { + case CommandFailed(b: Http.Bind) => + logger.error(s"Unable to bind to port ${this._port} on interface ${this._host}") + actorSystem.shutdown() + stopAndExit() + case _ => logger.info("Actor system started.") + } + } + + private def stopWebServiceActors() { + IO(Http) ! Http.CloseAll + } + + private def stopAndExit() { + logger.info("Stopping all services and exiting.") + stopAllServices() + logger.info("Services stopped") + throw new RuntimeException("Errors were found while initializing Dagr. This server will shutdown.") + } + + private def stopAndCatchExceptions(closure: => Unit) { + Try(closure).recover { + case ex: Throwable => logger.error("Exception ignored while shutting down.", ex) + } + } +} diff --git a/webservice/src/main/scala/dagr/webservice/DagrServerMain.scala b/webservice/src/main/scala/dagr/webservice/DagrServerMain.scala new file mode 100644 index 00000000..5b9dd584 --- /dev/null +++ b/webservice/src/main/scala/dagr/webservice/DagrServerMain.scala @@ -0,0 +1,112 @@ +/* + * The MIT License + * + * Copyright (c) 2017 Fulcrum Genomics LLC + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + * + */ + +package dagr.webservice + +import java.nio.file.Path + +import com.fulcrumgenomics.commons.CommonsDef.FilePath +import com.fulcrumgenomics.commons.util.LogLevel +import com.fulcrumgenomics.sopt.arg +import dagr.core.cmdline.{DagrCoreArgs, DagrCoreMain} +import dagr.core.exec._ +import dagr.core.tasksystem.Pipeline + +import scala.concurrent.ExecutionContext +import scala.reflect.ClassTag +import scala.reflect.runtime.universe.TypeTag + +object DagrServerMain extends dagr.core.config.Configuration { + /** The packages we wish to include in our command line **/ + protected def getPackageList: List[String] = { + val config = new dagr.core.config.Configuration {} + config.optionallyConfigure[List[String]](dagr.core.config.Configuration.Keys.PackageList) getOrElse List[String]("dagr") + } + + /** The main method */ + def main(args: Array[String]): Unit = { + new DagrServerMain[DagrServerArgs]().makeItSoAndExit(args) + } +} + +class DagrServerMain[T<:DagrCoreArgs:TypeTag:ClassTag] extends DagrCoreMain[T] + +class DagrServerArgs( + // TODO: update args with precedence information + @arg(doc = "Load in a custom configuration into Dagr. See https://github.com/typesafehub/config for details on the file format.") + override val config: Option[Path] = None, + @arg(doc = "Stop pipelines immediately on detecting the first failed task.") + override val failFast: Boolean = false, + @arg(doc = "Overrides the default scripts directory in the configuration file.") + override val scriptDir: Option[Path] = None, + @arg(doc = "Overrides default log directory in the configuration file.") + override val logDir: Option[Path] = None, + @arg(doc = "Set the logging level.", flag='l') + override val logLevel: LogLevel = LogLevel.Info, + @arg(doc = "Dagr scala scripts to compile and add to the list of programs (must end with .dagr or .scala).", minElements=0) + override val scripts: List[Path] = Nil, + @arg(doc = "Set the number of cores available to dagr.") + override val cores: Option[Double] = None, + @arg(doc = "Set the memory available to dagr.") + override val memory: Option[String] = None, + @arg(doc = "Write an execution report to this file, otherwise write to the stdout") + override val report: Option[Path] = None, + @arg(doc = "Provide an top-like interface for tasks with the give delay in seconds. This suppress info logging.") + interactive: Boolean = false, + @arg(doc = "Use the experimental execution system.") + override val experimentalExecution: Boolean = false, + @arg(doc = "Attempt to replay using the provided replay log") + override val replayLog: Option[FilePath] = None, + @arg(doc = "Run the Dagr web-service. Dagr will not exit once all tasks have completed.") + val webservice: Boolean = false, + @arg(doc = "The host name for the web-service.") + val host: Option[String] = None, + @arg(doc = "The port for teh web-service.") + val port: Option[Int] = None + ) extends DagrCoreArgs(config, failFast, scriptDir, logDir, logLevel, scripts, cores, memory, report, interactive, experimentalExecution, replayLog) { + + override protected[dagr] def configure(pipeline: Pipeline, commandLine: Option[String])(implicit ex: ExecutionContext): Unit = { + super.configure(pipeline, commandLine) + } + + override protected def executeSetup(executor: Executor, report: FilePath): Unit = { + super.executeSetup(executor, report) + + // Set up the web service + if (webservice) { + val server = new DagrServer(executor, this.host, this.port) + server.startAllServices() + sys addShutdownHook server.stopAllServices() + executor.withReporter(server.taskLogger) + } + } + + override protected def executeFinish(executor: Executor, report: FilePath): Unit = { + super.executeFinish(executor, report) + if (webservice) { + while (true) Thread.sleep(500) + } + } +} diff --git a/webservice/src/main/scala/dagr/webservice/PerRequest.scala b/webservice/src/main/scala/dagr/webservice/PerRequest.scala new file mode 100644 index 00000000..346e8ca8 --- /dev/null +++ b/webservice/src/main/scala/dagr/webservice/PerRequest.scala @@ -0,0 +1,149 @@ +/* + * Copyright (c) 2015, Broad Institute, Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * * Neither the name Broad Institute, Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, + * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE + */ + +package dagr.webservice + +import akka.actor.SupervisorStrategy.Stop +import akka.actor.{OneForOneStrategy, _} +import dagr.webservice.PerRequest.{WithProps, RequestCompleteWithHeaders, RequestComplete} +import spray.http.StatusCodes._ +import spray.http._ +import spray.httpx.marshalling.ToResponseMarshaller +import spray.routing.RequestContext + +import scala.concurrent.duration._ +import scala.language.postfixOps + +/** + * This actor controls the lifecycle of a request. It is responsible for forwarding the initial message + * to a target handling actor. This actor waits for the target actor to signal completion (via a message), + * timeout, or handle an exception. It is this actors responsibility to respond to the request and + * shutdown itself and child actors. + * + * Request completion can be signaled in 2 ways: + * 1) with just a response object + * 2) with a RequestComplete message which can specify http status code as well as the response + */ +trait PerRequest extends Actor { + import context._ + + def r: RequestContext + def target: ActorRef + def message: AnyRef + def timeout: Duration + + setReceiveTimeout(timeout) + target ! message + + def receive: PartialFunction[Any, Unit] = { + // The [Any] type parameter appears to be required for version of Scala > 2.11.2, + // the @ unchecked is required to muzzle erasure warnings. + case message: RequestComplete[Any] @ unchecked => complete(message.response)(message.marshaller) + case message: RequestCompleteWithHeaders[Any] @ unchecked => complete(message.response, message.headers:_*)(message.marshaller) + case ReceiveTimeout => complete(GatewayTimeout) + case x => + system.log.error("Unsupported response message sent to PreRequest actor: " + Option(x).getOrElse("null").toString) + complete(InternalServerError) + } + + /** + * Complete the request sending the given response and status code + * @param response to send to the caller + * @param marshaller to use for marshalling the response + * @tparam T the type of the response + * @return + */ + private def complete[T](response: T, headers: HttpHeader*)(implicit marshaller: ToResponseMarshaller[T]) = { + val additionalHeaders = None + r.withHttpResponseHeadersMapped(h => h ++ headers ++ additionalHeaders).complete(response) + stop(self) + } + + override val supervisorStrategy: SupervisorStrategy = + OneForOneStrategy() { + case e => + system.log.error(e, "error processing request: " + r.request.uri) + r.complete(InternalServerError, e.getMessage) + Stop + } +} + +object PerRequest { + sealed trait PerRequestMessage + /** + * Report complete, follows same pattern as spray.routing.RequestContext.complete; examples of how to call + * that method should apply here too. E.g. even though this method has only one parameter, it can be called + * with 2 where the first is a StatusCode: RequestComplete(StatusCode.Created, response) + */ + case class RequestComplete[T](response: T)(implicit val marshaller: ToResponseMarshaller[T]) extends PerRequestMessage + + /** + * Report complete with response headers. To response with a special status code the first parameter can be a + * tuple where the first element is StatusCode: RequestCompleteWithHeaders((StatusCode.Created, results), header). + * Note that this is here so that RequestComplete above can behave like spray.routing.RequestContext.complete. + */ + case class RequestCompleteWithHeaders[T](response: T, headers: HttpHeader*)(implicit val marshaller: ToResponseMarshaller[T]) extends PerRequestMessage + + /** allows for pattern matching with extraction of marshaller */ + private object RequestComplete_ { + def unapply[T](requestComplete: RequestComplete[T]) = Some((requestComplete.response, requestComplete.marshaller)) + } + + /** allows for pattern matching with extraction of marshaller */ + private object RequestCompleteWithHeaders_ { + def unapply[T](requestComplete: RequestCompleteWithHeaders[T]) = Some((requestComplete.response, requestComplete.headers, requestComplete.marshaller)) + } + + case class WithProps(r: RequestContext, props: Props, message: AnyRef, timeout: Duration, name: String) extends PerRequest { + lazy val target: ActorRef = context.actorOf(props, name) + } +} + +/** + * Provides factory methods for creating per request actors + */ +trait PerRequestCreator { + implicit def actorRefFactory: ActorRefFactory + + def perRequest(r: RequestContext, + props: Props, message: AnyRef, + timeout: Duration = 1 minutes, + name: String = PerRequestCreator.endpointActorName): ActorRef = { + actorRefFactory.actorOf(Props(WithProps(r, props, message, timeout, name)), name) + } +} + +object PerRequestCreator { + /* + This is yucky. For lack of a better idea on how to name the individual endpoint actors I've shamelessly stolen + what Dagr is doing. I believe the Monsanto library will clean this up but we're not using it yet + */ + def endpointActorName: String = "Endpoint-" + java.lang.Thread.currentThread.getStackTrace()(1).getMethodName + System.nanoTime() +} \ No newline at end of file