Skip to content
This repository has been archived by the owner on Jun 20, 2024. It is now read-only.

Commit

Permalink
Refactor task and exec system prior to new execution system.
Browse files Browse the repository at this point in the history
  • Loading branch information
nh13 committed May 15, 2017
1 parent 81b0ed5 commit 5d4ce5f
Show file tree
Hide file tree
Showing 23 changed files with 778 additions and 478 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/dagr/core/cmdline/DagrCoreMain.scala
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ class DagrCoreArgs(

// return an exit code based on the number of non-completed tasks
taskMan.taskToInfoBiMapFor.count { case (_, info) =>
TaskStatus.isTaskNotDone(info.status, failedIsDone=false)
TaskStatus.notDone(info.status, failedIsDone=false)
}
}

Expand Down
70 changes: 42 additions & 28 deletions core/src/main/scala/dagr/core/execsystem/FinalStatusReporter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,79 +24,93 @@

package dagr.core.execsystem

import com.fulcrumgenomics.commons.collection.BiMap
import java.time.{Duration, Instant}

import com.fulcrumgenomics.commons.util.SimpleCounter
import com.fulcrumgenomics.commons.util.StringUtil._
import com.fulcrumgenomics.commons.util.TimeUtil._
import dagr.core.tasksystem.Task
import dagr.core.tasksystem.Task.TaskInfo

import scala.collection.mutable
import scala.collection.mutable.ListBuffer

/** Provides a method to provide an execution report for a task tracker */
trait FinalStatusReporter {
this: TaskTracker =>

def tasks: Traversable[Task]

/** The header for the report */
private def reportHeader: List[String] = List(
"ID", "NAME", "STATUS", "CORES", "MEMORY",
"SUBMISSION_DATE", "START_DATE", "END_DATE",
"EXECUTION_TIME", "TOTAL_TIME",
"SCRIPT", "LOG", "ATTEMPT_INDEX", "DEBUG_STATE"
"SCRIPT", "LOG", "ATTEMPT_INDEX"
)

/** A row for the report. Each row is a given task. */
private def reportRow(taskInfo: TaskExecutionInfo): List[String] = {
private def reportRow(task: Task): List[String] = {
val info = task.taskInfo
val id: String = info.id.map(_.toString).getOrElse(task.name)
reportRow(info=info, id=id, submissionDate=info.submissionDate, startDate=info.startDate, endDate=info.endDate)
}

/** A row for the report. Each row is a given task. */
private def reportRow(info: TaskInfo,
id: String,
submissionDate: Option[Instant],
startDate: Option[Instant],
endDate: Option[Instant]): List[String] = {
// get the total execution time, and total time since submission
val (executionTime: String, totalTime: String) = taskInfo.durationSinceStartAndFormat
val (executionTime: String, totalTime: String) = info.executionAndTotalTime

// The state of execution
val graphNodeState = graphNodeStateFor(taskInfo.taskId).map(_.toString).getOrElse("NA")
List(
taskInfo.taskId.toString(),
taskInfo.task.name,
taskInfo.status.toString,
f"${taskInfo.resources.cores.value}%.2f",
taskInfo.resources.memory.prettyString,
timestampStringOrNA(taskInfo.submissionDate),
timestampStringOrNA(taskInfo.startDate),
timestampStringOrNA(taskInfo.endDate),
id,
info.task.name,
info.status.toString,
f"${info.resources.map(_.cores.value).getOrElse(0.0)}%.2f",
info.resources.map(_.memory.prettyString).getOrElse(""),
timestampStringOrNA(submissionDate),
timestampStringOrNA(startDate),
timestampStringOrNA(endDate),
executionTime,
totalTime,
taskInfo.script.toFile.getAbsolutePath,
taskInfo.logFile.toFile.getAbsolutePath,
taskInfo.attemptIndex,
graphNodeState).map(_.toString)
info.script.map(_.toFile.getAbsolutePath).getOrElse(""),
info.log.map(_.toFile.getAbsolutePath).getOrElse(""),
info.attempts
).map(_.toString)
}


/** Writes a delimited string of the status of all tasks managed
*
* @param loggerMethod the method to use to write task status information, one line at a time
* @param delimiter the delimiter between entries in a row
*/
def logReport(loggerMethod: String => Unit, delimiter: String = " "): Unit = {
val taskInfoMap: BiMap[Task, TaskExecutionInfo] = taskToInfoBiMapFor
//val taskInfoMap: BiMap[Task, TaskExecutionInfo] = taskToInfoBiMapFor

// Create the task status table
val taskStatusTable: ListBuffer[List[String]] = new ListBuffer[List[String]]()
taskStatusTable.append(reportHeader)
// Create a map to collect the counts for each task status
val taskStatusMap: scala.collection.mutable.Map[TaskStatus.Value, Int] = mutable.HashMap[TaskStatus.Value, Int]()
TaskStatus.values.foreach(status => taskStatusMap.put(status, 0))
val counter = new SimpleCounter[Task.TaskStatus]()
// Go through every task
for (taskInfo <- taskInfoMap.values.toList.sortBy(taskInfo => taskInfo.taskId)) {
tasks.toList.sortBy(task => (task.taskInfo.id.getOrElse(BigInt(-1)), task.name)).foreach { task =>
val info = task.taskInfo
// Make a report row
taskStatusTable.append(reportRow(taskInfo))
taskStatusTable.append(reportRow(task))
// Update the task status counts
taskStatusMap.put(taskInfo.status, taskStatusMap.get(taskInfo.status).get + 1)
counter.count(info.status)
}

// Write the task status table
loggerMethod(columnIt(taskStatusTable.toList, delimiter))

// Create and write the task status counts
val taskStatusCountTable = new ListBuffer[List[String]]()
val keys = taskStatusMap.keys.toList.filter(status => taskStatusMap.getOrElse(status, 0) > 0)
val keys: List[Task.TaskStatus] = counter.map(_._1).toList
taskStatusCountTable.append(keys.map(_.toString))
taskStatusCountTable.append(keys.map(status => taskStatusMap.getOrElse(status, 0).toString))
taskStatusCountTable.append(keys.map(status => counter.countOf(status).toString))
loggerMethod("\n" + columnIt(taskStatusCountTable.toList, delimiter))
}
}
2 changes: 1 addition & 1 deletion core/src/main/scala/dagr/core/execsystem/GraphNode.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class GraphNode(var task: Task,
_predecessors ++= predecessorNodes

def taskId: TaskId = taskInfo.taskId
def taskInfo: TaskExecutionInfo = task.taskInfo
def taskInfo: TaskExecutionInfo = task.execsystemTaskInfo

/** Remove a predecessor from the execution graph.
*
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/dagr/core/execsystem/ResourceSet.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ object ResourceSet {
def apply(cores: Double, memory: Long): ResourceSet = new ResourceSet(Cores(cores), Memory(memory))

val empty = ResourceSet(0, 0)
val Inf = ResourceSet(Double.MaxValue, Long.MaxValue)
@deprecated("use `Inf` instead", since="0.1.3")
val infinite = ResourceSet(Double.MaxValue, Long.MaxValue)
}

Expand Down
2 changes: 0 additions & 2 deletions core/src/main/scala/dagr/core/execsystem/TaskException.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
*/
package dagr.core.execsystem

import dagr.core.execsystem.TaskStatus.TaskStatus

/**
* An exception that can be thrown when there is an error processing tasks, that encapsulates
* the thrown exception and a failure status to use.
Expand Down
40 changes: 14 additions & 26 deletions core/src/main/scala/dagr/core/execsystem/TaskExecutionInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,45 +23,33 @@
*/
package dagr.core.execsystem

import java.nio.file.Path
import java.time.{Duration, Instant}

import com.fulcrumgenomics.commons.util.TimeUtil._
import dagr.core.DagrDef._
import dagr.core.execsystem.TaskStatus.Unknown
import dagr.core.tasksystem.Task
import com.fulcrumgenomics.commons.CommonsDef.FilePath

/** The state of execution of a [[Task]].
*
* @param task the task that will be executed.
* @param status the initial state of the task.
* @param script the path to the script where the task commands should be stored.
* @param logFile the path to the log file where the task stderr and stdout should be stored.
* @param submissionDate the submission date of the task, if any.
* @param resources the resources that the task was scheduled with.
* @param startDate the start date of the task, if any.
* @param endDate the end date of the task, if any.
* @param attemptIndex the one-based count of attempts to run this task.
*/
class TaskExecutionInfo(var task: Task,
class TaskExecutionInfo(task: Task,
var taskId: TaskId,
var status: TaskStatus.Value,
var script: Path,
var logFile: Path,
var submissionDate: Option[Instant],
var resources: ResourceSet = ResourceSet(0,0),
var startDate: Option[Instant] = None,
var endDate: Option[Instant] = None,
var attemptIndex: Int = 1 // one-based
) {
if (attemptIndex < 1) throw new RuntimeException("attemptIndex must be greater than zero")

task._taskInfo = Some(this)
initStatus: TaskStatus = Unknown,
script: FilePath,
log: FilePath,
resources: Option[ResourceSet] = Some(ResourceSet(0, 0)))
extends Task.TaskInfo(task=task, initStatus=initStatus, id=Some(taskId), script=Some(script), log=Some(log), resources=resources)
{
protected[core] var submissionDate: Option[Instant] = Some(Instant.now())
protected[core] var startDate: Option[Instant] = None
protected[core] var endDate: Option[Instant] = None

override def toString: String = {
val na: String = "NA"
s"STATUS[$status] ID[$taskId] NAME[${task.name}] SUBMITTED[${submissionDate.getOrElse(na)}]" +
s" START[${startDate.getOrElse(na)}] END[${endDate.getOrElse(na)}] ATTEMPT[$attemptIndex]" +
s" SCRIPT[$script] LOGFILE[$logFile]"
s" START[${startDate.getOrElse(na)}] END[${endDate.getOrElse(na)}] ATTEMPT[$attempts]" +
s" SCRIPT[$script] LOGFILE[$log]"
}

/** Gets the total execution time and total time since submission, in seconds, or None if the task has not started and ended. Formats
Expand Down
36 changes: 23 additions & 13 deletions core/src/main/scala/dagr/core/execsystem/TaskExecutionRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ private[core] trait TaskExecutionRunnerApi {
*/
def runningTaskIds: Iterable[TaskId]

/** Get the running tasks.
*
* @return true if the task is running, false otherwise
*/
def running(taskId: TaskId): Boolean

// NB: does the underlying process.destroy work?
/** Attempts to terminate a task's underlying process.
*
Expand Down Expand Up @@ -162,7 +168,7 @@ private[core] class TaskExecutionRunner extends TaskExecutionRunnerApi with Lazy
taskInfos.remove(taskId).isDefined
}

/** Start running a task. Call [[TaskExecutionRunner.completedTasks]] to see if subsequently completes.
/** Start running a task. Call [[TaskExecutionRunner.completedTasks]] to see if it subsequently completes.
*
* @param taskInfo the info associated with this task.
* @param simulate true if we are to simulate to run a task, false otherwise.
Expand All @@ -171,26 +177,26 @@ private[core] class TaskExecutionRunner extends TaskExecutionRunnerApi with Lazy
override def runTask(taskInfo: TaskExecutionInfo, simulate: Boolean = false): Boolean = taskInfo.task match {
case unitTask: UnitTask =>
try {
unitTask.applyResources(taskInfo.resources)
unitTask.applyResources(taskInfo.resources.get)
val taskRunner: TaskRunnable = (simulate, unitTask) match {
case (true, t: UnitTask) => new SimulatedTaskExecutionRunner(task = t)
case (false, t: InJvmTask) => new InJvmTaskExecutionRunner(task = t, script = taskInfo.script, logFile = taskInfo.logFile)
case (false, t: ProcessTask) => new ProcessTaskExecutionRunner(task = t, script = taskInfo.script, logFile = taskInfo.logFile)
case (false, t: InJvmTask) => new InJvmTaskExecutionRunner(task = t, script = taskInfo.script.get, logFile = taskInfo.log.get)
case (false, t: ProcessTask) => new ProcessTaskExecutionRunner(task = t, script = taskInfo.script.get, logFile = taskInfo.log.get)
case _ => throw new RuntimeException("Could not run a unknown type of task")
}
val thread = new Thread(taskRunner)
processes.put(taskInfo.taskId, thread)
taskRunners.put(taskInfo.taskId, taskRunner)
taskInfos.put(taskInfo.taskId, taskInfo)
thread.start()
taskInfo.status = TaskStatus.STARTED
taskInfo.status = TaskStatus.Started
taskInfo.startDate = Some(Instant.now())
true
}
catch {
case e: Exception =>
logger.exception(e, s"Failed schedule for [${unitTask.name}]: ")
taskInfo.status = TaskStatus.FAILED_SCHEDULING
taskInfo.status = TaskStatus.FailedScheduling
false
}
case _ => throw new RuntimeException("Cannot call runTask on tasks that are not 'UnitTask's")
Expand All @@ -205,13 +211,15 @@ private[core] class TaskExecutionRunner extends TaskExecutionRunnerApi with Lazy


// In case it has previously been stopped
if (TaskStatus.isTaskNotDone(taskInfo.status, failedIsDone=failedAreCompleted)) {
taskInfo.endDate = Some(Instant.now())
taskInfo.status = {
if ((0 == exitCode && onCompleteSuccessful) || failedAreCompleted) TaskStatus.SUCCEEDED
else if (0 != exitCode) TaskStatus.FAILED_COMMAND
else TaskStatus.FAILED_ON_COMPLETE // implied !onCompleteSuccessful
if (TaskStatus.notDone(taskInfo.status, failedIsDone=failedAreCompleted)) {
taskInfo.endDate = Some(Instant.now())
taskInfo.status = {
if ((0 == exitCode && onCompleteSuccessful) || failedAreCompleted) TaskStatus.SucceededExecution
else if (0 != exitCode) TaskStatus.FailedExecution
else TaskStatus.FailedOnComplete // implied !onCompleteSuccessful
}
taskInfo.exitCode = Some(exitCode)
taskInfo.throwable = throwable
}
throwable.foreach { thr =>
logger.error(
Expand Down Expand Up @@ -248,6 +256,8 @@ private[core] class TaskExecutionRunner extends TaskExecutionRunnerApi with Lazy
completedTasks.toMap
}

override def running(taskId: TaskId): Boolean = taskInfos.contains(taskId)

override def runningTaskIds: Iterable[TaskId] = {
processes.keys
}
Expand All @@ -262,7 +272,7 @@ private[core] class TaskExecutionRunner extends TaskExecutionRunnerApi with Lazy
// if it is alive, interrupt it
thread.interrupt()
thread.join(100) // just give it 0.1 of second
taskInfo.status = TaskStatus.STOPPED
taskInfo.status = TaskStatus.Stopped
}
taskInfo.endDate = Some(Instant.now())
!thread.isAlive // thread is still alive WTF
Expand Down
Loading

0 comments on commit 5d4ce5f

Please sign in to comment.