Skip to content
This repository has been archived by the owner on Jun 20, 2024. It is now read-only.

Commit

Permalink
A few more updates
Browse files Browse the repository at this point in the history
  • Loading branch information
nh13 committed Apr 27, 2017
1 parent 54add65 commit 4c5eec3
Showing 1 changed file with 87 additions and 61 deletions.
148 changes: 87 additions & 61 deletions core/src/main/scala/dagr/core/execsystem2/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
package dagr.core.execsystem2

import dagr.commons.util.LazyLogging
import dagr.core.tasksystem.Task
import dagr.core.tasksystem.{Task, UnitTask}

import scala.collection.mutable
import scala.concurrent.ExecutionContext.Implicits.global
Expand Down Expand Up @@ -72,39 +72,37 @@ class ExecutorImpl[T<:Task](protected val taskExecutor: TaskExecutor[T]) extends
*
* The following describes the routing of each method. The tasks statuses are updated appropriately:
*
* [[execute(task)]]
* -> [[addToDependencyGraph()]]
* [[execute(task)]] // executes the task assuming it is the root of the graph
* -> [[processTask()]] // executes the task and its children end-to-end
*
* [[addToDependencyGraph()]]
* -> [[DependencyGraph.add]]
* -> *** BLOCK *** the future that is returned above only completes when the added task has no more dependencies
* -> [[buildTask()]]
* [[processTask()]] // executes a task end-to-end
* -> [[enqueue]] // add the task to the dependency graph and block until it has no more dependents
* -> [[buildAndExecute()]] // builds the task, and either executes it directly, or processed each child end-to-end
*
* [[buildTask()]]
* [[buildAndExecute()]] // builds and executes a task
* -> case 1: a unit task
* -> [[executeUnitTask()]]
* -> [[executeUnitTask()]] // execute the task
* -> case 2: a list of tasks (ex. Pipeline)
* -> [[addToDependencyGraph()]]
* -> [[processTask()]] // execute each child end-to-end
*
* [[executeUnitTask()]]
* -> [[submit()]]
* -> ** BLOCK *** the future only returns when the task can be executed
* -> [[execute(task, future)]]
* -> [[onComplete()]]
* -> [[finalizeTask()]]
* [[executeUnitTask()]] // handles the submission and execution of task
* -> [[submit()]] // submit the [[TaskExecutor]] and return when the task can be executed
* -> [[executeHandler()]] // execute the task with the [[TaskHandler]] and return when it has finished executing
* -> [[onComplete()]] // call the onComplete method and return when it is complete
* -> [[finalizeTask()]] // remove the task from the dependency graph and return when complete
*
* [[submit()]]
* -> [[TaskExecutor.submit()]]
* -> [[TaskExecutor.submit()]] // blocks until the task can be executed
*
* [[execute(task, future)]]
* -> [[TaskHandler.execute()]]
* -> [[TaskHandler.execute()]] // blocks until the task has finished executing
*
* [[onComplete()]]
* -> [[Task.onComplete()]]
* -> [[Task.onComplete()]] // blocks until the onComplete method returns
*
* [[finalizeTask()]]
* -> [[DependencyGraph.remove()]]
* -> [[TaskExecutor.remove()]]
* -> [[DependencyGraph.remove()]] // remove the task from the dependency graph
* -> [[TaskExecutor.remove()]] // remove the task from the task executor
*/


Expand All @@ -117,19 +115,61 @@ class ExecutorImpl[T<:Task](protected val taskExecutor: TaskExecutor[T]) extends
def execute(rootTask: Task): Int = {
logger.debug(s"Starting root: ${rootTask.name}")
logger.debug(s"Root has ${rootTask.tasksDependedOn.size} dependencies")

val rootFuture: Future[Any] = addToDependencyGraph(rootTask)
Await.result(rootFuture, Duration.Inf)
Await.result(processTask(rootTask), Duration.Inf) foreach { task =>
logger.debug(s"Root task has status ${this.statuses.getOrElse(task, "Unknown")}")
}
this.dependencyGraph.size
}

/** Enqueues the given tasks, builds and executes them. Returns a `Future` of the list of tasks. */
private def processTask(task: Task*): Future[Seq[Task]] = {
val builtTasks = task.map { t =>
recoverWith(t) {
for {
task1 <- enqueue(t) // enqueue the tasks and wait for it to have no dependencies
task2 <- buildAndExecute(task1) // build the task, and execute it and its descendents
} yield task2
}
}
Future.sequence(builtTasks)
}

/** Build the task and catch any exceptions during the call to getTasks. */
private def buildTask(task: Task): Future[Seq[Task]] = futureFailedToBuild { task.getTasks.toList }

/** Builds a task and proceeds based on if it create other tasks or itself should be executed.
*
* Call `getTasks` on the task (TODO: rename getTasks to build or the like) and handle any failure during that call.
* Next, if the task returned itself, execute it, otherwise add the returned task(s) to the dependency graph.
*
* @param task the task with no dependencies.
* @return a `Future` on the task to build and execute.
*/
private def buildAndExecute(task: Task): Future[Task] = {
requireNoDependencies(task)
this.statuses.put(task, Queued)
buildTask(task) flatMap {
case Nil => futureFailedToBuild { throw new IllegalStateException(s"No tasks to schedule for task: '${task.name}'") }
case x :: Nil if x == task => executeUnitTask(x.asInstanceOf[T])// one task and it returned itself, so execute it
case _tasks => processTask(_tasks:_*) // a different task, or more than one task, so build those tasks
} map (_ => task)
}


/** Submits and executes a task.
*/
private def executeUnitTask(task: T): Future[Any] = {
val submitFuture: Future[TaskHandler] = this.submit(task) // Submit the task
val executeFuture: Future[TaskHandler] = this.execute(task, submitFuture) // Execute the task
val onCompleteFuture: Future[TaskHandler] = this.onComplete(task, executeFuture)
this.finalizeTask(task, onCompleteFuture)
private def executeUnitTask(task: T): Future[Task] = {
val submitFuture: Future[TaskHandler] = submit(task) // Submit the task
val executeFuture: Future[TaskHandler] = executeHandler(task, submitFuture) // Execute the task
val onCompleteFuture: Future[TaskHandler] = onComplete(task, executeFuture) // Run the onComplete method
val finalizeFuture: Future[TaskHandler] = finalizeTask(task, onCompleteFuture) // Check the success or failure of the task

finalizeFuture recoverWith {
case thr: TaggedException =>
require(thr.status.isInstanceOf[Failed] || thr.status == Running, s"Expected status to be Failed or Running, but found ${thr.status}")
fail(task, thr.thr, thr.status); Future.failed(thr.thr)
case thr => throw thr
} map (_ => task)
}

/** Submit the task to the [[TaskExecutor]]. A [[TaskHandler]] is returned when the task is ready for execution.
Expand All @@ -144,7 +184,7 @@ class ExecutorImpl[T<:Task](protected val taskExecutor: TaskExecutor[T]) extends
/** Execute the task using the provided [[TaskHandler]]'s [[TaskHandler.execute()]] method. Fails with a
* [[TaggedException]] with the task status is set to [[FailedExecution]].
*/
private def execute(task: T, submitFuture: Future[TaskHandler]): Future[TaskHandler] = futureFailedExecution {
private def executeHandler(task: T, submitFuture: Future[TaskHandler]): Future[TaskHandler] = futureFailedExecution {
submitFuture flatMap { handler: TaskHandler =>
requireTaskStatus(task, Submitted)
this.statuses.put(task, Running)
Expand All @@ -170,50 +210,36 @@ class ExecutorImpl[T<:Task](protected val taskExecutor: TaskExecutor[T]) extends
}
}

/** Finalizes the state of a task. If the provided [[Future]] is successful, then set the status to succeeded. If there
* was any failure, the throwable should be a [[TaggedException]] so that the apporiate failed status can be set. */
/** Finalizes the state of a task. If the provided [[Future]] is successful, then set the status to succeeded and the
* task is removed from the dependency graph. */
private def finalizeTask(task: T, onCompleteFuture: Future[TaskHandler]) = onCompleteFuture map { handler =>
logger.debug(s"succeeded execution ${task.name}")
this.statuses.put(task, SucceededExecution)
this.dependencyGraph.remove(task)
require(this.taskExecutor.remove(task).nonEmpty, "Task could not be removed from the executor.")
logger.debug(s"removed task from dependency graph ${task.name}")
handler
} recoverWith {
case thr: TaggedException =>
require(thr.status.isInstanceOf[Failed] || thr.status == Running, s"Expected status to be Failed or Running, but found ${thr.status}")
fail(task, thr.thr, thr.status); Future.failed(thr.thr)
case thr => throw thr
}

/** Builds a task and proceeds based on if it create other tasks or itself should be executed.
*
* Call `getTasks` on the task (TODO: rename getTasks to build or the like) and handle any failure during that call.
* Next, if the task returned itself, execute it, otherwise add the returned task(s) to the dependency graph.
*
* @param task the task with no dependencies.
* @return
*/
private def buildTask(task: Task): Future[Any] = {
requireNoDependencies(task)
requireTaskStatus(task, Queued)
futureFailedToBuild { task.getTasks.toList } flatMap {
case Nil => futureFailedToBuild { throw new IllegalStateException(s"No tasks to schedule for task: [${task.name}]") }
case x :: Nil if x == task => executeUnitTask(x.asInstanceOf[T]) // one task and it returned itself, so execute it
case _tasks => Future.sequence( _tasks.map(addToDependencyGraph)).map (_ => task)
}
}

/** Add the given task to the dependency graph and update its status to [[Queued]]. */
private def addToDependencyGraph(task: Task): Future[Any] = {
logger.debug(s"addToDependencyGraph: ${task.name}")
private def enqueue(task: Task): Future[Task] = {
logger.debug(s"enqueue: ${task.name}")
require(!this.statuses.contains(task), s"Task '${task.name}' is already being executed with state: ${this.statuses(task)}")
requireNoDependencies(task, missingOk=true)
this.statuses.put(task, Pending)
this.dependencyGraph.add(task) flatMap { task =>
this.statuses.put(task, Queued)
buildTask(task)
}
this.dependencyGraph.add(task)
}

/** Provides special handling if [[Future]] fails. If the throwable is a [[TaggedException]] then the task's status
* will be changed to failed, and the underlying exception will be returned. Otherwise, we throw the failure. */
private def recoverWith[T](task: Task)(future: Future[T]): Future[T] = future recoverWith {
// If there was any failure, the throwable should be a [[TaggedException]] so that the appropriate failed status can
// be set. Other throwables will be re-thrown.
case thr: TaggedException =>
require(thr.status.isInstanceOf[Failed] || Seq(Running, Queued).contains(thr.status),
s"Expected status to be Failed, Running, or Queued, but found ${thr.status}")
fail(task, thr.thr, thr.status); Future.failed(thr.thr)
case thr => throw thr
}

/** Set the status to the failed and add the throwable to the failures map for this task */
Expand Down

0 comments on commit 4c5eec3

Please sign in to comment.