Skip to content
This repository has been archived by the owner on Jun 20, 2024. It is now read-only.

New Exec system #278

Closed
wants to merge 3 commits into from
Closed

New Exec system #278

wants to merge 3 commits into from

Conversation

nh13
Copy link
Member

@nh13 nh13 commented Apr 8, 2017

@tfenne I think this is now achieves feature parity with the current execution system. I added an option --experimental-execution to enable the new task system. Interactive mode, final reports, and status logging are supported, though the latter is substantially different (and more verbose). I have tested it on some simple workflows (see one example below). I have a a lot of cleanup to complete on the actual code, as well as likely a bunch more tests, but I am very pleased with the progress. After cleanup, I think I am ready to add in "replay".

package dagr.pipelines

import dagr.core.cmdline.Pipelines
import dagr.core.tasksystem._
import com.fulcrumgenomics.sopt.{arg, clp}
import dagr.core.execsystem.{Cores, Memory, ResourceSet}

private trait GreedyResourcePicking extends UnitTask {
  override def pickResources(availableResources: ResourceSet): Option[ResourceSet] = {
    val mem = Memory("1g")
    (8 to 1 by -1).map(c => ResourceSet(Cores(c), mem)).find(rs => availableResources.subset(rs).isDefined)
  }
}

private class SleepProcessTask(seconds: Int = 1) extends ProcessTask with GreedyResourcePicking {
  override def args: Seq[Any] = "sleep" :: s"$seconds" :: Nil
}

private class SleepInJvmTask(seconds: Int = 1) extends SimpleInJvmTask with GreedyResourcePicking {
  def run(): Unit = Thread.sleep(seconds * 1000)
}

/**
  * Very simple example pipeline that creates random tasks and dependencies
  */
@clp(description="A bunch of sleep tasks.", group = classOf[Pipelines])
class SleepyPipeline
( @arg(flag='j', doc="Use JVM tasks") val jvmTask: Boolean = false,
  @arg(flag='n', doc="The number of tasks to create") val numTasks: Int = 100,
  @arg(flag='p', doc="The probability of creating a dependency") val dependencyProbability: Double = 0.1
) extends Pipeline {

  private def toATask: () => SleepProcessTask = () => new SleepProcessTask
  private def toBTask: () => SleepInJvmTask = () => new SleepInJvmTask
  private val toTask   = if (jvmTask) toBTask else toATask
  private val taskType = if (jvmTask) "JVM" else "Shell"

  override def build(): Unit = {
    // create the tasks
    val tasks: Seq[Task] = for (i <- 0 to numTasks) yield toTask() withName s"task-$i"

    // make them depend on previous tasks
    var rootTasks = Seq.range(start=0, numTasks).toSet
    val randomNumberGenerator = scala.util.Random
    for (i <- 0 until numTasks) {
      for (j <- 0 until i) {
        if (randomNumberGenerator.nextFloat < dependencyProbability) {
          tasks(j) ==> tasks(i)
          rootTasks = rootTasks - i
        }
      }
    }

    rootTasks.foreach { i =>
      root ==> tasks(i)
    }
  }
}

@nh13 nh13 force-pushed the nh_exec2 branch 12 times, most recently from afe895b to e652d9b Compare April 9, 2017 07:34
}
}

class ExecutorImpl(protected val taskExecutor: TaskExecutor) extends Executor with LazyLogging {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am using Futures for callbacks, and I am a bit of a novice here. But I think the idea is sound.


// Execute the task
val executeFuture: Future[TaskHandler] = submitFuture flatMap { handler: TaskHandler =>
// TODO: apply the resources it was scheduled with: task.applyResources(resources)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no long relevant

@codecov-io
Copy link

codecov-io commented Apr 27, 2017

Codecov Report

Merging #278 into master will increase coverage by 0.81%.
The diff coverage is 93.68%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #278      +/-   ##
==========================================
+ Coverage   91.38%   92.19%   +0.81%     
==========================================
  Files          31       42      +11     
  Lines        1137     1537     +400     
  Branches      106      127      +21     
==========================================
+ Hits         1039     1417     +378     
- Misses         98      120      +22
Impacted Files Coverage Δ
...n/scala/dagr/core/tasksystem/SimpleInJvmTask.scala 100% <ø> (ø) ⬆️
...main/scala/dagr/core/tasksystem/ShellCommand.scala 75% <ø> (ø) ⬆️
core/src/main/scala/dagr/core/exec/Scheduler.scala 100% <ø> (ø)
...src/main/scala/dagr/core/tasksystem/Pipeline.scala 94.73% <ø> (+5.26%) ⬆️
...src/main/scala/dagr/core/tasksystem/UnitTask.scala 100% <ø> (ø) ⬆️
core/src/main/scala/dagr/core/exec/Resource.scala 93.75% <ø> (ø)
...c/main/scala/dagr/core/tasksystem/Dependable.scala 72.72% <ø> (ø) ⬆️
...e/src/main/scala/dagr/core/tasksystem/Linker.scala 100% <ø> (ø) ⬆️
...src/main/scala/dagr/core/exec/NaiveScheduler.scala 96.66% <ø> (ø)
.../main/scala/dagr/core/tasksystem/Schedulable.scala 100% <ø> (ø) ⬆️
... and 39 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 58be32e...508e2c1. Read the comment docs.

@nh13 nh13 force-pushed the nh_exec2 branch 12 times, most recently from 788d6dd to 063dee4 Compare May 15, 2017 09:00
@nh13 nh13 force-pushed the nh_exec2 branch 2 times, most recently from 5f62c83 to a19fe7f Compare May 19, 2017 07:01
@nh13 nh13 force-pushed the nh_exec2 branch 2 times, most recently from d7806c2 to 63b65b5 Compare May 19, 2017 08:01
@nh13 nh13 requested a review from tfenne May 19, 2017 08:07
@nh13 nh13 removed the request for review from tfenne May 19, 2017 08:07
@nh13 nh13 unassigned tfenne May 19, 2017
@nh13 nh13 requested a review from tfenne May 29, 2017 22:34
@nh13
Copy link
Member Author

nh13 commented May 29, 2017

@tfenne I think this is finally ready for review!

@nh13 nh13 changed the title WIP: New Exec system New Exec system May 30, 2017
@nh13 nh13 force-pushed the nh_exec2 branch 3 times, most recently from dac06fd to e4e7933 Compare June 6, 2017 21:01
@nh13
Copy link
Member Author

nh13 commented Jun 11, 2017

Closing in favor of: #299

@nh13 nh13 closed this Jun 11, 2017
@nh13 nh13 deleted the nh_exec2 branch June 11, 2017 05:34
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants