Skip to content
This repository has been archived by the owner on Jun 20, 2024. It is now read-only.

New Exec system #278

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ lazy val commonSettings = Seq(
testOptions in Test += Tests.Argument(TestFrameworks.ScalaTest, "-h", Option(System.getenv("TEST_HTML_REPORTS")).getOrElse(htmlReportsDirectory)),
testOptions in Test += Tests.Argument("-l", "LongRunningTest"), // ignores long running tests
// uncomment for full stack traces
//testOptions in Test += Tests.Argument("-oD"),
testOptions in Test += Tests.Argument("-oDF"),
fork in Test := true,
resolvers += Resolver.jcenterRepo,
resolvers += Resolver.sonatypeRepo("public"),
Expand Down
116 changes: 79 additions & 37 deletions core/src/main/scala/dagr/core/cmdline/DagrCoreMain.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,22 @@ import java.text.DecimalFormat
import com.fulcrumgenomics.commons.CommonsDef.unreachable
import com.fulcrumgenomics.commons.io.{Io, PathUtil}
import com.fulcrumgenomics.commons.util.{LazyLogging, LogLevel, Logger}
import com.fulcrumgenomics.sopt.Sopt.CommandSuccess
import com.fulcrumgenomics.sopt.cmdline.{CommandLineParser, CommandLineProgramParserStrings, ValidationException}
import com.fulcrumgenomics.sopt.parsing.{ArgOptionAndValues, ArgTokenCollator, ArgTokenizer, OptionParser}
import com.fulcrumgenomics.sopt.cmdline.{CommandLineProgramParserStrings, ValidationException}
import com.fulcrumgenomics.sopt.parsing.{ArgOptionAndValues, ArgTokenCollator, ArgTokenizer}
import com.fulcrumgenomics.sopt.util.TermCode
import com.fulcrumgenomics.sopt.{OptionName, Sopt, arg}
import com.fulcrumgenomics.sopt.{Sopt, arg}
import dagr.core.config.Configuration
import dagr.core.exec.{Cores, Memory}
import dagr.core.execsystem._
import dagr.core.execsystem2.GraphExecutor
import dagr.core.execsystem2.{TopLikeStatusReporter => TopLikeStatusReporter2}
import dagr.core.execsystem2.local.LocalTaskExecutor
import dagr.core.reporting.{FinalStatusReporter, PeriodicRefreshingReporter, Terminal}
import dagr.core.tasksystem.Pipeline

import scala.collection.mutable.ListBuffer
import scala.util.{Failure, Success}
import scala.concurrent.ExecutionContext
import scala.util.Success

object DagrCoreMain extends Configuration {
/** The packages we wish to include in our command line **/
Expand All @@ -52,7 +57,9 @@ object DagrCoreMain extends Configuration {

/** The main method */
/** The main method */
def main(args: Array[String]): Unit = new DagrCoreMain().makeItSoAndExit(args)
def main(args: Array[String]): Unit = {
new DagrCoreMain().makeItSoAndExit(args)
}

/** Provide a command line validation error message */
private[cmdline] def buildErrorMessage(msgOption: Option[String] = None, exceptionOption: Option[Exception] = None): String = {
Expand Down Expand Up @@ -103,11 +110,14 @@ class DagrCoreArgs(
@arg(doc = "Write an execution report to this file, otherwise write to the stdout")
val report: Option[Path] = None,
@arg(doc = "Provide an top-like interface for tasks with the give delay in seconds. This suppress info logging.")
var interactive: Boolean = false
var interactive: Boolean = false,
@arg(doc = "Use the experimental execution system.")
val experimentalExecution: Boolean = false
) extends LazyLogging {

// These are not optional, but are only populated during configure()
private var taskManager : Option[TaskManager] = None
private var taskExecutor: Option[LocalTaskExecutor] = None
private var reportPath : Option[Path] = None

// Initialize the configuration as early as possible
Expand All @@ -121,11 +131,11 @@ class DagrCoreArgs(
/** Try to create a given directory, and if there is an exception, write the path since some exceptions can be obtuse */
private def mkdir(dir: Path, use: String, errors: ListBuffer[String]): Unit = {
try { Files.createDirectories(dir) }
catch { case e: Exception => errors += DagrCoreMain.buildErrorMessage(Some(s"Could not create the $use directory: $dir")) }
catch { case _: Exception => errors += DagrCoreMain.buildErrorMessage(Some(s"Could not create the $use directory: $dir")) }
}

// Invoked by DagrCommandLineParser after the pipeline has also been instantiated
private[cmdline] def configure(pipeline: Pipeline, commandLine: Option[String] = None) : Unit = {
private[cmdline] def configure(pipeline: Pipeline, commandLine: Option[String] = None)(implicit ex: ExecutionContext) : Unit = {
try {
val config = new Configuration { }

Expand All @@ -149,7 +159,12 @@ class DagrCoreArgs(
this.reportPath.foreach(p => Io.assertCanWriteFile(p, parentMustExist=false))

val resources = SystemResources(cores = cores.map(Cores(_)), totalMemory = memory.map(Memory(_)))
this.taskManager = Some(new TaskManager(taskManagerResources=resources, scriptsDirectory = scriptsDirectory, logDirectory = logDirectory))
if (experimentalExecution) {
this.taskExecutor = Some(new LocalTaskExecutor(systemResources = resources, scriptsDirectory = scriptsDirectory, logDirectory = logDirectory))
}
else {
this.taskManager = Some(new TaskManager(taskManagerResources = resources, scriptsDirectory = scriptsDirectory, logDirectory = logDirectory))
}

// Print all the arguments if desired.
commandLine.foreach { line =>
Expand All @@ -168,52 +183,79 @@ class DagrCoreArgs(
* Attempts to setup the various directories needed to executed the pipeline, execute it, and generate
* an execution report.
*/
protected[cmdline] def execute(pipeline : Pipeline): Int = {
val taskMan = this.taskManager.getOrElse(throw new IllegalStateException("execute() called before configure()"))
protected[cmdline] def execute(pipeline : Pipeline)(implicit ex: ExecutionContext): Int = {
val report = this.reportPath.getOrElse(throw new IllegalStateException("execute() called before configure()"))

val interactiveReporter: Option[TopLikeStatusReporter] = interactive match {
case true if !Terminal.supportsAnsi =>
logger.warning("ANSI codes are not supported in your terminal. Interactive mode will not be used.")
interactive = false
None
case true =>
val loggerOutputStream = new ByteArrayOutputStream()
val loggerPrintStream = new PrintStream(loggerOutputStream)
Logger.out = loggerPrintStream
Some(new TopLikeStatusReporter(taskMan, Some(loggerOutputStream), print = (s: String) => System.out.print(s)))
case false => None
if (interactive && !Terminal.supportsAnsi) {
logger.warning("ANSI codes are not supported in your terminal. Interactive mode will not be used.")
interactive = false
None
}

def toLoggerOutputStream(): ByteArrayOutputStream = {
val loggerOutputStream = new ByteArrayOutputStream()
val loggerPrintStream = new PrintStream(loggerOutputStream)
Logger.out = loggerPrintStream
loggerOutputStream
}
interactiveReporter.foreach(_.start())

taskMan.addTask(pipeline)
taskMan.runToCompletion(this.failFast)
val (finalStatusReporter: FinalStatusReporter, exitCode: Int) = if (experimentalExecution) {
val taskExecutor = this.taskExecutor.getOrElse(throw new IllegalStateException("execute() called before configure()"))
val graphExecutor = GraphExecutor(taskExecutor)
if (interactive) {
val reporter = new TopLikeStatusReporter2(
systemResources = taskExecutor.resources,
loggerOut = Some(toLoggerOutputStream()),
print = s => System.out.print(s)
)
graphExecutor.withLogger(reporter)
}
(graphExecutor, graphExecutor execute pipeline)
}
else {
val taskMan = this.taskManager.getOrElse(throw new IllegalStateException("execute() called before configure()"))

val interactiveReporter: Option[PeriodicRefreshingReporter] = if (!interactive) { None } else {
val reporter = new TaskManagerReporter(taskManager=taskMan)
Some(new PeriodicRefreshingReporter(reporter=reporter, loggerOut=Some(toLoggerOutputStream()), print = s => System.out.print(s)))
}
interactiveReporter.foreach(_.start())

taskMan.addTask(pipeline)
taskMan.runToCompletion(this.failFast)

interactiveReporter.foreach(_.shutdown())

// return an exit code based on the number of non-completed tasks
val code = taskMan.taskToInfoBiMapFor.count { case (_, info) =>
TaskStatus.notDone(info.status, failedIsDone=false)
}
(taskMan, code)
}

// Write out the execution report
if (!interactive || Io.StdOut != report) {
val pw = new PrintWriter(Io.toWriter(report))
taskMan.logReport({ str: String => pw.write(str + "\n") })
finalStatusReporter.logReport({ str: String => pw.write(str + "\n") })
pw.close()
}

interactiveReporter.foreach(_.shutdown())

// return an exit code based on the number of non-completed tasks
taskMan.taskToInfoBiMapFor.count { case (_, info) =>
TaskStatus.isTaskNotDone(info.status, failedIsDone=false)
}
exitCode
}

}

class DagrCoreMain extends LazyLogging {
protected def name: String = "dagr"

/** A main method that invokes System.exit with the exit code. */
def makeItSoAndExit(args: Array[String]): Unit = System.exit(makeItSo(args))
def makeItSoAndExit(args: Array[String]): Unit = {
import scala.concurrent.ExecutionContext.Implicits.global
System.exit(makeItSo(args))
}

/** A main method that returns an exit code instead of exiting. */
def makeItSo(args: Array[String], packageList: List[String] = DagrCoreMain.getPackageList, includeHidden: Boolean = false): Int = {
def makeItSo(args: Array[String], packageList: List[String] = DagrCoreMain.getPackageList, includeHidden: Boolean = false)
(implicit ex: ExecutionContext): Int = {
// Initialize color options
TermCode.printColor = DagrCoreMain.optionallyConfigure[Boolean](Configuration.Keys.ColorStatus).getOrElse(true)

Expand All @@ -226,7 +268,7 @@ class DagrCoreMain extends LazyLogging {
case Sopt.Failure(usage) =>
System.err.print(usage())
1
case Sopt.CommandSuccess(cmd) =>
case Sopt.CommandSuccess(_) =>
unreachable("CommandSuccess should never be returned by parseCommandAndSubCommand.")
case Sopt.SubcommandSuccess(dagr, pipeline) =>
val name = pipeline.getClass.getSimpleName
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/dagr/core/config/Configuration.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ import java.io.File
import java.nio.file.{Files, Path}
import java.time.Duration

import com.typesafe.config.{ConfigParseOptions, ConfigFactory, Config}
import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
import com.typesafe.config.ConfigException.Generic
import com.fulcrumgenomics.commons.io.PathUtil._
import com.fulcrumgenomics.commons.util.LazyLogging
import dagr.core.execsystem.{Cores, Memory}
import dagr.core.exec.{Cores, Memory}

import scala.collection.SortedSet
import scala.reflect.runtime.universe.{TypeTag, typeOf}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*
*/
package dagr.core.execsystem
package dagr.core.exec

import dagr.core.tasksystem.{InJvmTask, ProcessTask, UnitTask}

Expand All @@ -31,10 +32,10 @@ class NaiveScheduler extends Scheduler {
* Takes the list of tasks that could be scheduled if their resource needs can be met and attempts
* to schedule a single task for execution.
*/
private[execsystem] def scheduleOneTask(readyTasks: Traversable[UnitTask],
remainingSystemCores: Cores,
remainingSystemMemory: Memory,
remainingJvmMemory: Memory): Option[(UnitTask, ResourceSet)] = {
private[exec] def scheduleOneTask(readyTasks: Traversable[UnitTask],
remainingSystemCores: Cores,
remainingSystemMemory: Memory,
remainingJvmMemory: Memory): Option[(UnitTask, ResourceSet)] = {
val systemResourceSet: ResourceSet = ResourceSet(remainingSystemCores, remainingSystemMemory)
val jvmResourceSet: ResourceSet = ResourceSet(remainingSystemCores, remainingJvmMemory)
// Find the first task that can be executed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*
*/
package dagr.core.execsystem
package dagr.core.exec

import oshi.SystemInfo
import oshi.hardware.platform.mac.MacHardwareAbstractionLayer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,19 @@
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*
*/
package dagr.core.execsystem
package dagr.core.exec

import dagr.core.exec

object ResourceSet {
def apply(that: ResourceSet): ResourceSet = new ResourceSet(cores = Cores(that.cores.value), memory = Memory(that.memory.value))
def apply(cores: Double, memory: Long): ResourceSet = new ResourceSet(Cores(cores), Memory(memory))

val empty = ResourceSet(0, 0)
val Inf = ResourceSet(Double.MaxValue, Long.MaxValue)
@deprecated("use `Inf` instead", since="0.1.3")
val infinite = ResourceSet(Double.MaxValue, Long.MaxValue)
}

Expand Down Expand Up @@ -66,7 +71,7 @@ case class ResourceSet(cores: Cores = Cores.none, memory: Memory = Memory.none)
}

/**
* Returns a [[dagr.core.execsystem.ResourceSet]] with the remaining resources after subtracting `other`
* Returns a [[exec.ResourceSet]] with the remaining resources after subtracting `other`
* if doing so would not generate negative resources. Otherwise returns `None`.
*/
def minusOption(other: ResourceSet) : Option[ResourceSet] = if (subsettable(other)) Some(this - other) else None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*
*/
package dagr.core.execsystem
package dagr.core.exec

import com.fulcrumgenomics.commons.util.LazyLogging
import dagr.core.tasksystem.{InJvmTask, ProcessTask, Task, UnitTask}
Expand All @@ -33,7 +34,7 @@ abstract class Scheduler extends LazyLogging {
* while running. Only schedules based on the resources available after subtracting resources from running tasks.
*
* @param runningTasks the tasks that are currently running.
* @param readyTasks the tasks that should be considered to be schedule.
* @param readyTasks the tasks that should be considered to be scheduled.
* @param systemCores the set of system cores.
* @param systemMemory the set of system memory.
* @param jvmMemory the set of JVM memory.
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/dagr/core/execsystem/GraphNode.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class GraphNode(var task: Task,
_predecessors ++= predecessorNodes

def taskId: TaskId = taskInfo.taskId
def taskInfo: TaskExecutionInfo = task.taskInfo
def taskInfo: TaskExecutionInfo = task.execsystemTaskInfo

/** Remove a predecessor from the execution graph.
*
Expand Down
2 changes: 0 additions & 2 deletions core/src/main/scala/dagr/core/execsystem/TaskException.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
*/
package dagr.core.execsystem

import dagr.core.execsystem.TaskStatus.TaskStatus

/**
* An exception that can be thrown when there is an error processing tasks, that encapsulates
* the thrown exception and a failure status to use.
Expand Down
44 changes: 18 additions & 26 deletions core/src/main/scala/dagr/core/execsystem/TaskExecutionInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,45 +23,37 @@
*/
package dagr.core.execsystem

import java.nio.file.Path
import java.time.{Duration, Instant}

import com.fulcrumgenomics.commons.CommonsDef.FilePath
import com.fulcrumgenomics.commons.util.TimeUtil._
import dagr.core.DagrDef._
import dagr.core.exec.ResourceSet
import dagr.core.execsystem.TaskStatus.Unknown
import dagr.core.tasksystem.Task

/** The state of execution of a [[Task]].
*
* @param task the task that will be executed.
* @param status the initial state of the task.
* @param script the path to the script where the task commands should be stored.
* @param logFile the path to the log file where the task stderr and stdout should be stored.
* @param submissionDate the submission date of the task, if any.
* @param resources the resources that the task was scheduled with.
* @param startDate the start date of the task, if any.
* @param endDate the end date of the task, if any.
* @param attemptIndex the one-based count of attempts to run this task.
*/
class TaskExecutionInfo(var task: Task,
var taskId: TaskId,
var status: TaskStatus.Value,
var script: Path,
var logFile: Path,
var submissionDate: Option[Instant],
var resources: ResourceSet = ResourceSet(0,0),
var startDate: Option[Instant] = None,
var endDate: Option[Instant] = None,
var attemptIndex: Int = 1 // one-based
) {
if (attemptIndex < 1) throw new RuntimeException("attemptIndex must be greater than zero")
class TaskExecutionInfo(task: Task,
initId: TaskId,
initStatus: TaskStatus = Unknown,
script: FilePath,
log: FilePath,
resources: Option[ResourceSet] = Some(ResourceSet(0, 0)))
extends Task.TaskInfo(task=task, initStatus=initStatus, id=Some(initId), script=Some(script), log=Some(log), resources=resources)
{
protected[core] var submissionDate: Option[Instant] = Some(Instant.now())
protected[core] var startDate: Option[Instant] = None
protected[core] var endDate: Option[Instant] = None

task._taskInfo = Some(this)
def taskId: TaskId = this.id.get
def taskId_=(id: TaskId): Unit = this.id = Some(id)

override def toString: String = {
val na: String = "NA"
s"STATUS[$status] ID[$taskId] NAME[${task.name}] SUBMITTED[${submissionDate.getOrElse(na)}]" +
s" START[${startDate.getOrElse(na)}] END[${endDate.getOrElse(na)}] ATTEMPT[$attemptIndex]" +
s" SCRIPT[$script] LOGFILE[$logFile]"
s" START[${startDate.getOrElse(na)}] END[${endDate.getOrElse(na)}] ATTEMPT[$attempts]" +
s" SCRIPT[$script] LOGFILE[$log]"
}

/** Gets the total execution time and total time since submission, in seconds, or None if the task has not started and ended. Formats
Expand Down
Loading