Skip to content
This repository has been archived by the owner on Jun 20, 2024. It is now read-only.

Commit

Permalink
web 2.0
Browse files Browse the repository at this point in the history
  • Loading branch information
nh13 committed Jun 26, 2017
1 parent 286b795 commit 57ac01f
Show file tree
Hide file tree
Showing 26 changed files with 1,032 additions and 47 deletions.
29 changes: 26 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ lazy val core = Project(id="dagr-core", base=file("core"))
"com.fulcrumgenomics" %% "commons" % "0.2.0-SNAPSHOT",
"com.fulcrumgenomics" %% "sopt" % "0.2.0-SNAPSHOT",
"com.github.dblock" % "oshi-core" % "3.3",
"com.beachape" %% "enumeratum" % "1.5.12",
"com.beachape" %% "enumeratum" % "1.5.12",
"org.scala-lang" % "scala-reflect" % scalaVersion.value,
"org.scala-lang" % "scala-compiler" % scalaVersion.value,
"org.reflections" % "reflections" % "0.9.10",
Expand Down Expand Up @@ -164,6 +164,29 @@ lazy val pipelines = Project(id="dagr-pipelines", base=file("pipelines"))
.disablePlugins(sbtassembly.AssemblyPlugin)
.dependsOn(tasks, core)

////////////////////////////////////////////////////////////////////////////////////////////////
// webservice project
////////////////////////////////////////////////////////////////////////////////////////////////
val akkaV = "2.3.9"
val sprayV = "1.3.3"
lazy val webservice = Project(id="dagr-webservice", base=file("webservice"))
.settings(commonSettings: _*)
.settings(unidocSettings: _*)
.settings(assemblySettings: _*)
.settings(description := "A tool to execute tasks in directed acyclic graphs.")
.settings(
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % akkaV,
"io.spray" %% "spray-can" % sprayV,
"io.spray" %% "spray-routing" % sprayV,
"io.spray" %% "spray-client" % sprayV,
"io.spray" %% "spray-http" % sprayV,
"io.spray" %% "spray-json" % sprayV
)
)
.aggregate(core, tasks, pipelines)
.dependsOn(core, tasks, pipelines)

////////////////////////////////////////////////////////////////////////////////////////////////
// root (dagr) project
////////////////////////////////////////////////////////////////////////////////////////////////
Expand All @@ -176,8 +199,8 @@ lazy val root = Project(id="dagr", base=file("."))
.settings(unidocSettings: _*)
.settings(assemblySettings: _*)
.settings(description := "A tool to execute tasks in directed acyclic graphs.")
.aggregate(core, tasks, pipelines)
.dependsOn(core, tasks, pipelines)
.aggregate(core, tasks, pipelines, webservice) // FIXME: should not depend on webservice
.dependsOn(core, tasks, pipelines, webservice)

////////////////////////////////////////////////////////////////////////////////////////////////
// Merge strategy for assembly
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/scala/dagr/core/DagrDef.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,10 @@ package dagr.core
object DagrDef {
/** The type of identifier used to uniquely identify tasks tracked by the execution system. */
type TaskId = BigInt

/** Companion methods for TaskId */
object TaskId {
/** The apply method for TaskId */
def apply(value: Int): TaskId = BigInt(value)
}
}
49 changes: 31 additions & 18 deletions core/src/main/scala/dagr/core/cmdline/DagrCoreMain.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,21 @@ import dagr.core.tasksystem.Pipeline

import scala.collection.mutable.ListBuffer
import scala.concurrent.ExecutionContext
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag
import scala.util.Success


object DagrCoreMain extends Configuration {
/** The packages we wish to include in our command line **/
protected def getPackageList: List[String] = {
val config = new Configuration {}
config.optionallyConfigure[List[String]](Configuration.Keys.PackageList) getOrElse List[String]("dagr")
}

/** The main method */
/** The main method */
def main(args: Array[String]): Unit = {
new DagrCoreMain().makeItSoAndExit(args)
new DagrCoreMain[DagrCoreArgs]().makeItSoAndExit(args)
}

/** Provide a command line validation error message */
Expand Down Expand Up @@ -132,7 +134,7 @@ class DagrCoreArgs(
}

// Invoked by DagrCommandLineParser after the pipeline has also been instantiated
private[cmdline] def configure(pipeline: Pipeline, commandLine: Option[String] = None)(implicit ex: ExecutionContext) : Unit = {
protected[dagr] def configure(pipeline: Pipeline, commandLine: Option[String] = None)(implicit ex: ExecutionContext) : Unit = {
try {
val config = new Configuration { }

Expand Down Expand Up @@ -178,16 +180,7 @@ class DagrCoreArgs(
}
}

/**
* Attempts to setup the various directories needed to executed the pipeline, execute it, and generate
* an execution report.
*/
protected[cmdline] def execute(pipeline : Pipeline)(implicit ex: ExecutionContext): Int = {
val report = this.reportPath.getOrElse(throw new IllegalStateException("execute() called before configure()"))

// Get the executor
val executor = this.executor.getOrElse(throw new IllegalStateException("Executor was not configured, did you all configure()"))

protected def executeSetup(executor: Executor, report: FilePath): Unit = {
// Set up an interactive logger if desired and supported
if (this.interactive) {
if (Terminal.supportsAnsi) {
Expand Down Expand Up @@ -215,22 +208,42 @@ class DagrCoreArgs(
this.replayLog.foreach { log =>
executor.withReporter(TaskCache(log))
}
}

// execute
val exitCode = executor.execute(pipeline)

protected def executeFinish(executor: Executor, report: FilePath): Unit = {
// Write out the execution report
if (!interactive || Io.StdOut != report) {
val pw = new PrintWriter(Io.toWriter(report))
executor.logReport({ str: String => pw.write(str + "\n") })
pw.close()
}

}

/**
* Attempts to setup the various directories needed to executed the pipeline, execute it, and generate
* an execution report.
*/
protected[cmdline] def execute(pipeline : Pipeline)(implicit ex: ExecutionContext): Int = {
val report = this.reportPath.getOrElse(throw new IllegalStateException("execute() called before configure()"))

// Get the executor
val executor = this.executor.getOrElse(throw new IllegalStateException("Executor was not configured, did you all configure()"))

// Set up any task prior to execution
executeSetup(executor, report)

// execute
val exitCode = executor.execute(pipeline)

// complete any shutdown tasks after execution
executeFinish(executor, report)

exitCode
}
}

class DagrCoreMain extends LazyLogging {
class DagrCoreMain[Args<:DagrCoreArgs:TypeTag:ClassTag] extends LazyLogging {
protected def name: String = "dagr"

/** A main method that invokes System.exit with the exit code. */
Expand All @@ -250,7 +263,7 @@ class DagrCoreMain extends LazyLogging {

val startTime = System.currentTimeMillis()
val packages = Sopt.find[Pipeline](packageList, includeHidden=includeHidden)
val exit = Sopt.parseCommandAndSubCommand[DagrCoreArgs,Pipeline](name, args, packages) match {
val exit = Sopt.parseCommandAndSubCommand[Args,Pipeline](name, args, packages) match {
case Sopt.Failure(usage) =>
System.err.print(usage())
1
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/dagr/core/exec/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ trait Executor extends FinalStatusReporter {
}

/** Returns the task status by ordinal */
def from(ordinal: Int): TaskStatus
def statusFrom(ordinal: Int): TaskStatus

/** Returns the log directory. */
def logDir: DirPath
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/dagr/core/exec/TaskCache.scala
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ class SimpleTaskCache(replayLogLines: Seq[String], source: Option[String] = None
require(task._executor.isDefined, s"Executor not defined for task '${task.name}'")
this.statuses.filter { status => status.definitionCode == previousDefinition.code }
.sortBy(-_.statusOrdinal)
.map(status => task._executor.map(_.from(status.statusOrdinal).success).getOrElse(unreachable(s"Executor not set for task '${task.name}'")))
.map(status => task._executor.map(_.statusFrom(status.statusOrdinal).success).getOrElse(unreachable(s"Executor not set for task '${task.name}'")))
.headOption match {
case Some(s) if s => Unit // if it previously succeeded, don't set it to execute!
case _ => setToExecute(task)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,5 +162,5 @@ private[execsystem] trait TaskManagerLike extends Executor {
def stepExecution(): (Traversable[Task], Traversable[Task], Traversable[Task], Traversable[Task])

/** Returns the task status by ordinal */
final def from(ordinal: Int): TaskStatus = TaskStatus.withValue(ordinal)
final def statusFrom(ordinal: Int): TaskStatus = TaskStatus.withValue(ordinal)
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ trait GraphExecutor[T<:Task] extends Executor {
def resources: Option[SystemResources] = Some(this.taskExecutor.resources)

/** Returns the task status by ordinal */
final def from(ordinal: Int): TaskStatus = TaskStatus.withValue(ordinal)
final def statusFrom(ordinal: Int): TaskStatus = TaskStatus.withValue(ordinal)

/** Returns the executor that execute tasks. */
protected def taskExecutor: TaskExecutor[T]
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/dagr/core/reporting/ExecutionLogger.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import com.fulcrumgenomics.commons.io.Io
import dagr.core.reporting.ExecutionLogger.{Definition, Relationship, Status}
import dagr.core.reporting.ReportingDef._
import dagr.core.tasksystem.Task
import dagr.core.tasksystem.Task.{TaskStatus, TaskInfo => RootTaskInfo}
import dagr.core.tasksystem.Task.{TaskInfoLike, TaskStatus, TaskInfo => RootTaskInfo}

import scala.collection.mutable

Expand Down Expand Up @@ -167,7 +167,7 @@ class ExecutionLogger(log: FilePath) extends TaskLogger with TaskRegister with A
}

/** The method that will be called with updated task information. */
def record(info: RootTaskInfo): Unit = logStatusChange(info)
def record(info: TaskInfoLike): Unit = logStatusChange(info)

def close(): Unit = if (!this.closed) {
this.writer.flush()
Expand Down Expand Up @@ -203,7 +203,7 @@ class ExecutionLogger(log: FilePath) extends TaskLogger with TaskRegister with A
}

/** Logs the a status update for the task. */
private def logStatusChange(rootInfo: RootTaskInfo): Unit = {
private def logStatusChange(rootInfo: TaskInfoLike): Unit = {
val definition = this.taskToDefinition(rootInfo.task)
write(Status(rootInfo.status, definition).toString + "\n")
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/dagr/core/reporting/ReportingDef.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
package dagr.core.reporting

import dagr.core.tasksystem.Task
import dagr.core.tasksystem.Task.TaskInfo
import dagr.core.tasksystem.Task.TaskInfoLike

object ReportingDef {

Expand All @@ -36,7 +36,7 @@ object ReportingDef {
/** Base trait for all classes interested in when the task status changes for any task. */
trait TaskLogger extends TaskReporter {
/** The method that will be called with updated task information. */
def record(info: TaskInfo): Unit
def record(info: TaskInfoLike): Unit
}

/** Base trait for all classes interested in when a new task is built by another task (ex.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ package dagr.core.reporting

import com.fulcrumgenomics.commons.util.{LazyLogging, Logger}
import dagr.core.reporting.ReportingDef.TaskLogger
import dagr.core.tasksystem.Task.{TaskInfo => RootTaskInfo}
import dagr.core.tasksystem.Task.{TaskInfoLike, TaskInfo => RootTaskInfo}

/** A simple logger that delegates to [[dagr.core.tasksystem.Task.TaskInfo#logTaskMessage]]. */
class TaskStatusLogger extends TaskLogger {
Expand All @@ -36,5 +36,5 @@ class TaskStatusLogger extends TaskLogger {
override lazy val logger: Logger = new Logger(getClass)
}
private val logger = new Dagr().logger
def record(info: RootTaskInfo): Unit = info.logTaskMessage(this.logger)
def record(info: TaskInfoLike): Unit = info.logTaskMessage(this.logger)
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import dagr.core.exec.{Executor, SystemResources}
import dagr.core.execsystem.TaskManager
import dagr.core.execsystem2.GraphExecutor
import dagr.core.reporting.ReportingDef.TaskLogger
import dagr.core.tasksystem.Task.TaskInfo
import dagr.core.tasksystem.Task.{TaskInfo, TaskInfoLike}
import dagr.core.tasksystem.{InJvmTask, ProcessTask, Task, UnitTask}

import scala.collection.mutable
Expand Down Expand Up @@ -318,7 +318,7 @@ trait TopLikeStatusReporter extends TaskLogger with Terminal {
}

/** This method is called when any info about a task is updated. */
final def record(info: TaskInfo): Unit = {
final def record(info: TaskInfoLike): Unit = {
// add the task to the set of known tasks
this._tasks += info.task
// refresh the screen
Expand Down
61 changes: 56 additions & 5 deletions core/src/main/scala/dagr/core/tasksystem/Task.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,59 @@ object Task {
override def toString: String = this.description
}

/** The execution information for a task. Used for extrenal read-only access to [[TaskInfo]]. Any execution system
* should extend [[TaskInfo]] instead class to store their specific metadata. */
trait TaskInfoLike extends Ordered[TaskInfoLike] {
def task : Task
def id : Option[TaskId]
def attempts : Int
def script : Option[FilePath]
def log : Option[FilePath]
def resources : Option[ResourceSet]
def exitCode : Option[Int]
def throwable : Option[Throwable]

/** The current status of the task. */
def status : TaskStatus

/** The instant the task reached a given status. */
def timePoints : Traversable[TimePoint]

/** The instant the task reached the current status. */
def statusTime : Instant

private[core] def logTaskMessage(logger: Logger)

def compare(that: TaskInfoLike): Int = {
(this.id, that.id) match {
case (Some(_), None) => -1
case (None, Some(_)) => 1
case (None, None) => this.status.ordinal - that.status.ordinal
case (Some(l), Some(r)) => (l - r).toInt
}
}
}

object TimePoint {
def parse(s: String, f: Int => TaskStatus): TimePoint = {
s.split(',').toList match {
case _ :: ordinal :: instant :: _ =>
TimePoint(
status = f(ordinal.toInt),
instant = Instant.parse(instant)
)
case _ =>
throw new IllegalArgumentException(s"Could not parse TimePoint '$s'")
}
}
}

/** A tuple representing the instant the task was set to the given status. */
private[core] case class TimePoint(status: TaskStatus, instant: Instant)
case class TimePoint(status: TaskStatus, instant: Instant) {
override def toString: String = {
s"${this.status.name},${this.status.ordinal},${this.instant.toString},${this.status.description}"
}
}

/** Execution information associated with a task. Any execution system should extend this class to store
* their specific metadata.
Expand Down Expand Up @@ -90,7 +141,7 @@ object Task {
var resources : Option[ResourceSet] = None,
var exitCode : Option[Int] = None,
var throwable : Option[Throwable] = None
) {
) extends TaskInfoLike {

if (attempts < 1) throw new RuntimeException("attempts must be greater than zero")

Expand Down Expand Up @@ -295,7 +346,7 @@ trait Task extends Dependable {

/** The execution information about this task, or None if not being executed. */
private[core] var _taskInfo : Option[TaskInfo] = None
private[core] def taskInfo : TaskInfo = this._taskInfo.get
private[dagr] def taskInfo : TaskInfo = this._taskInfo.get
private[core] def taskInfo_=(info: TaskInfo) = {
this._taskInfo = Some(info)
this._executor.foreach(_.record(info))
Expand All @@ -316,10 +367,10 @@ trait Task extends Dependable {
private val dependedOnByTasks = new ListBuffer[Task]()

/** Gets the sequence of tasks that this task depends on.. */
protected[core] def tasksDependedOn: Traversable[Task] = this.dependsOnTasks.toList
protected[dagr] def tasksDependedOn: Traversable[Task] = this.dependsOnTasks.toList

/** Gets the sequence of tasks that depend on this task. */
protected[core] def tasksDependingOnThisTask: Traversable[Task] = this.dependedOnByTasks.toList
protected[dagr] def tasksDependingOnThisTask: Traversable[Task] = this.dependedOnByTasks.toList

/** Must be implemented to handle the addition of a dependent. */
override def addDependent(dependent: Dependable): Unit = dependent.headTasks.foreach(t => {
Expand Down
6 changes: 3 additions & 3 deletions core/src/test/scala/dagr/core/exec/SimpleTaskCacheTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ class SimpleTaskCacheTest extends FutureUnitSpec {
private val executor: Executor = Executor(experimentalExecution=true, resources=SystemResources.infinite)

/** The status with the lowest ordinal that does not represent a successful execution status. */
private val failStatus: TaskStatus = Stream.from(0).map(i => executor.from(i)).find(!_.success).getOrElse(unreachable("No unsuccessful status found"))
private val failStatus: TaskStatus = Stream.from(0).map(i => executor.statusFrom(i)).find(!_.success).getOrElse(unreachable("No unsuccessful status found"))

/** The status with the lowest ordinal that represents a successful execution status. */
private val successfulStatus: TaskStatus = Stream.from(0).map(i => executor.from(i)).find(_.success).getOrElse(unreachable("No successful status found"))
private val successfulStatus: TaskStatus = Stream.from(0).map(i => executor.statusFrom(i)).find(_.success).getOrElse(unreachable("No successful status found"))

/** A simple implicit to set the default executor */
implicit private class WithExecutor[T <: Task](task: T) {
Expand Down Expand Up @@ -95,7 +95,7 @@ class SimpleTaskCacheTest extends FutureUnitSpec {

it should "fail if a task definition is not found for a status" in {
val definition = Definition.buildRootDefinition(new NoOpTask)
val status = Status(executor.from(0), definition)
val status = Status(executor.statusFrom(0), definition)
val lines = Seq(status).map(_.toString)
val exception = intercept[Exception] { new SimpleTaskCache(lines) }
exception.getMessage should include (s"missing a definition for task '${definition.code}' with status '$status'")
Expand Down
Loading

0 comments on commit 57ac01f

Please sign in to comment.