From 5e942666870b86cc1fbdf7f07c1d5eb8ebaaa9ab Mon Sep 17 00:00:00 2001 From: Nils Homer Date: Wed, 26 Apr 2017 21:17:59 -0700 Subject: [PATCH] A few more updates --- .../dagr/core/execsystem2/Executor.scala | 147 ++++++++++-------- 1 file changed, 86 insertions(+), 61 deletions(-) diff --git a/core/src/main/scala/dagr/core/execsystem2/Executor.scala b/core/src/main/scala/dagr/core/execsystem2/Executor.scala index 3779e064..43ffb182 100644 --- a/core/src/main/scala/dagr/core/execsystem2/Executor.scala +++ b/core/src/main/scala/dagr/core/execsystem2/Executor.scala @@ -26,7 +26,7 @@ package dagr.core.execsystem2 import dagr.commons.util.LazyLogging -import dagr.core.tasksystem.Task +import dagr.core.tasksystem.{Task, UnitTask} import scala.collection.mutable import scala.concurrent.ExecutionContext.Implicits.global @@ -72,39 +72,37 @@ class ExecutorImpl[T<:Task](protected val taskExecutor: TaskExecutor[T]) extends * * The following describes the routing of each method. The tasks statuses are updated appropriately: * - * [[execute(task)]] - * -> [[addToDependencyGraph()]] + * [[execute(task)]] // executes the task assuming it is the root of the graph + * -> [[processTask()]] // executes the task and its children end-to-end * - * [[addToDependencyGraph()]] - * -> [[DependencyGraph.add]] - * -> *** BLOCK *** the future that is returned above only completes when the added task has no more dependencies - * -> [[buildTask()]] + * [[processTask()]] // executes a task end-to-end + * -> [[enqueue]] // add the task to the dependency graph and block until it has no more dependents + * -> [[buildAndExecute()]] // builds the task, and either executes it directly, or processed each child end-to-end * - * [[buildTask()]] + * [[buildAndExecute()]] // builds and executes a task * -> case 1: a unit task - * -> [[executeUnitTask()]] + * -> [[executeUnitTask()]] // execute the task * -> case 2: a list of tasks (ex. Pipeline) - * -> [[addToDependencyGraph()]] + * -> [[processTask()]] // execute each child end-to-end * - * [[executeUnitTask()]] - * -> [[submit()]] - * -> ** BLOCK *** the future only returns when the task can be executed - * -> [[execute(task, future)]] - * -> [[onComplete()]] - * -> [[finalizeTask()]] + * [[executeUnitTask()]] // handles the submission and execution of task + * -> [[submit()]] // submit the [[TaskExecutor]] and return when the task can be executed + * -> [[executeHandler()]] // execute the task with the [[TaskHandler]] and return when it has finished executing + * -> [[onComplete()]] // call the onComplete method and return when it is complete + * -> [[finalizeTask()]] // remove the task from the dependency graph and return when complete * * [[submit()]] - * -> [[TaskExecutor.submit()]] + * -> [[TaskExecutor.submit()]] // blocks until the task can be executed * * [[execute(task, future)]] - * -> [[TaskHandler.execute()]] + * -> [[TaskHandler.execute()]] // blocks until the task has finished executing * * [[onComplete()]] - * -> [[Task.onComplete()]] + * -> [[Task.onComplete()]] // blocks until the onComplete method returns * * [[finalizeTask()]] - * -> [[DependencyGraph.remove()]] - * -> [[TaskExecutor.remove()]] + * -> [[DependencyGraph.remove()]] // remove the task from the dependency graph + * -> [[TaskExecutor.remove()]] // remove the task from the task executor */ @@ -117,19 +115,60 @@ class ExecutorImpl[T<:Task](protected val taskExecutor: TaskExecutor[T]) extends def execute(rootTask: Task): Int = { logger.debug(s"Starting root: ${rootTask.name}") logger.debug(s"Root has ${rootTask.tasksDependedOn.size} dependencies") - - val rootFuture: Future[Any] = addToDependencyGraph(rootTask) - Await.result(rootFuture, Duration.Inf) + Await.result(processTask(rootTask), Duration.Inf) foreach { task => + logger.debug(s"Root task has status ${this.statuses.getOrElse(task, "Unknown")}") + } this.dependencyGraph.size } + /** Enqueues the given tasks, builds and executes them. Returns a `Future` of the list of tasks. */ + private def processTask(task: Task*): Future[Seq[Task]] = { + val builtTasks = task.map { t => + recoverWith(t) { + for { + task1 <- enqueue(t) // enqueue the tasks and wait for it to have no dependencies + task2 <- buildAndExecute(task1) // build the task, and execute it and its descendents + } yield task2 + } + } + Future.sequence(builtTasks) + } + + /** Build the task and catch any exceptions during the call to getTasks. */ + private def buildTask(task: Task): Future[Seq[Task]] = futureFailedToBuild { task.getTasks.toList } + + /** Builds a task and proceeds based on if it create other tasks or itself should be executed. + * + * Call `getTasks` on the task (TODO: rename getTasks to build or the like) and handle any failure during that call. + * Next, if the task returned itself, execute it, otherwise add the returned task(s) to the dependency graph. + * + * @param task the task with no dependencies. + * @return a `Future` on the task to build and execute. + */ + private def buildAndExecute(task: Task): Future[Task] = { + requireNoDependencies(task) + this.statuses.put(task, Queued) + buildTask(task) flatMap { + case Nil => futureFailedToBuild { throw new IllegalStateException(s"No tasks to schedule for task: '${task.name}'") } + case x :: Nil if x == task => executeUnitTask(x.asInstanceOf[T])// one task and it returned itself, so execute it + case _tasks => processTask(_tasks:_*) // a different task, or more than one task, so build those tasks + } map (_ => task) + } + /** Submits and executes a task. */ - private def executeUnitTask(task: T): Future[Any] = { - val submitFuture: Future[TaskHandler] = this.submit(task) // Submit the task - val executeFuture: Future[TaskHandler] = this.execute(task, submitFuture) // Execute the task - val onCompleteFuture: Future[TaskHandler] = this.onComplete(task, executeFuture) - this.finalizeTask(task, onCompleteFuture) + private def executeUnitTask(task: T): Future[Task] = { + val submitFuture: Future[TaskHandler] = submit(task) // Submit the task + val executeFuture: Future[TaskHandler] = executeHandler(task, submitFuture) // Execute the task + val onCompleteFuture: Future[TaskHandler] = onComplete(task, executeFuture) // Run the onComplete method + val finalizeFuture: Future[TaskHandler] = finalizeTask(task, onCompleteFuture) // Check the success or failure of the task + + finalizeFuture recoverWith { + case thr: TaggedException => + require(thr.status.isInstanceOf[Failed] || thr.status == Running, s"Expected status to be Failed or Running, but found ${thr.status}") + fail(task, thr.thr, thr.status); Future.failed(thr.thr) + case thr => throw thr + } map (_ => task) } /** Submit the task to the [[TaskExecutor]]. A [[TaskHandler]] is returned when the task is ready for execution. @@ -144,7 +183,7 @@ class ExecutorImpl[T<:Task](protected val taskExecutor: TaskExecutor[T]) extends /** Execute the task using the provided [[TaskHandler]]'s [[TaskHandler.execute()]] method. Fails with a * [[TaggedException]] with the task status is set to [[FailedExecution]]. */ - private def execute(task: T, submitFuture: Future[TaskHandler]): Future[TaskHandler] = futureFailedExecution { + private def executeHandler(task: T, submitFuture: Future[TaskHandler]): Future[TaskHandler] = futureFailedExecution { submitFuture flatMap { handler: TaskHandler => requireTaskStatus(task, Submitted) this.statuses.put(task, Running) @@ -170,8 +209,8 @@ class ExecutorImpl[T<:Task](protected val taskExecutor: TaskExecutor[T]) extends } } - /** Finalizes the state of a task. If the provided [[Future]] is successful, then set the status to succeeded. If there - * was any failure, the throwable should be a [[TaggedException]] so that the apporiate failed status can be set. */ + /** Finalizes the state of a task. If the provided [[Future]] is successful, then set the status to succeeded and the + * task is removed from the dependency graph. */ private def finalizeTask(task: T, onCompleteFuture: Future[TaskHandler]) = onCompleteFuture map { handler => logger.debug(s"succeeded execution ${task.name}") this.statuses.put(task, SucceededExecution) @@ -179,41 +218,27 @@ class ExecutorImpl[T<:Task](protected val taskExecutor: TaskExecutor[T]) extends require(this.taskExecutor.remove(task).nonEmpty, "Task could not be removed from the executor.") logger.debug(s"removed task from dependency graph ${task.name}") handler - } recoverWith { - case thr: TaggedException => - require(thr.status.isInstanceOf[Failed] || thr.status == Running, s"Expected status to be Failed or Running, but found ${thr.status}") - fail(task, thr.thr, thr.status); Future.failed(thr.thr) - case thr => throw thr - } - - /** Builds a task and proceeds based on if it create other tasks or itself should be executed. - * - * Call `getTasks` on the task (TODO: rename getTasks to build or the like) and handle any failure during that call. - * Next, if the task returned itself, execute it, otherwise add the returned task(s) to the dependency graph. - * - * @param task the task with no dependencies. - * @return - */ - private def buildTask(task: Task): Future[Any] = { - requireNoDependencies(task) - requireTaskStatus(task, Queued) - futureFailedToBuild { task.getTasks.toList } flatMap { - case Nil => futureFailedToBuild { throw new IllegalStateException(s"No tasks to schedule for task: [${task.name}]") } - case x :: Nil if x == task => executeUnitTask(x.asInstanceOf[T]) // one task and it returned itself, so execute it - case _tasks => Future.sequence( _tasks.map(addToDependencyGraph)).map (_ => task) - } } /** Add the given task to the dependency graph and update its status to [[Queued]]. */ - private def addToDependencyGraph(task: Task): Future[Any] = { - logger.debug(s"addToDependencyGraph: ${task.name}") + private def enqueue(task: Task): Future[Task] = { + logger.debug(s"enqueue: ${task.name}") require(!this.statuses.contains(task), s"Task '${task.name}' is already being executed with state: ${this.statuses(task)}") requireNoDependencies(task, missingOk=true) this.statuses.put(task, Pending) - this.dependencyGraph.add(task) flatMap { task => - this.statuses.put(task, Queued) - buildTask(task) - } + this.dependencyGraph.add(task) + } + + /** Provides special handling if [[Future]] fails. If the throwable is a [[TaggedException]] then the task's status + * will be changed to failed, and the underlying exception will be returned. Otherwise, we throw the failure. */ + private def recoverWith[T](task: Task)(future: Future[T]): Future[T] = future recoverWith { + // If there was any failure, the throwable should be a [[TaggedException]] so that the appropriate failed status can + // be set. Other throwables will be re-thrown. + case thr: TaggedException => + require(thr.status.isInstanceOf[Failed] || Seq(Running, Queued).contains(thr.status), + s"Expected status to be Failed, Running, or Queued, but found ${thr.status}") + fail(task, thr.thr, thr.status); Future.failed(thr.thr) + case thr => throw thr } /** Set the status to the failed and add the throwable to the failures map for this task */