Skip to content
This repository has been archived by the owner on Jun 20, 2024. It is now read-only.

Commit

Permalink
Various Task Manage Speedups (#373)
Browse files Browse the repository at this point in the history
* Pipeline: do not add children of tasks already added

* Pipeline: solve stack overflow in addChildren

* Task: find cycles in a collection of tasks all at once.

When a task returns a list of tasks via getTasks, we currently check for
cyclical dependencies on each task independently.  If they are all
connected in the DAG, then this is really slow!  This avoids that
by finding the strongly connected components jointly across the new *to
be added* tasks.

* Small performance optimizations on dense dags

* TaskManager: exponential reduction in sleep time if we can do anything,
otherwise linear increase

* No need to check for cycles before executing a task, as it was checked
when added.

* Warn if a single step in execution takes longer than 30 seconds

* Optimize GraphNode

* update based on review

* ResourceSet bug when subsetting resources and some performance optimizations (#374)

* ResourceSet bug fix

If the minimum to subset to is fractional, with a different fractional
value than the maximum, it could be missed.

* Task Manager optimizations

* A few NaiveScheduler simplifications
  • Loading branch information
nh13 authored Mar 25, 2020
1 parent ad14fa1 commit 9313a7e
Show file tree
Hide file tree
Showing 12 changed files with 274 additions and 171 deletions.
7 changes: 5 additions & 2 deletions core/src/main/scala/dagr/core/execsystem/GraphNode.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class GraphNode(var task: Task,
var state: GraphNodeState.Value = GraphNodeState.PREDECESSORS_AND_UNEXPANDED,
val enclosingNode: Option[GraphNode] = None) extends BaseGraphNode {

private val _predecessors = new ListBuffer[GraphNode]()
private val _predecessors = new scala.collection.mutable.LinkedHashSet[GraphNode]()

_predecessors ++= predecessorNodes

Expand Down Expand Up @@ -93,9 +93,12 @@ class GraphNode(var task: Task,
addPredecessors(predecessor.toSeq:_*)
}

/** Gets the number of predecessors */
def numPredecessors: Int = _predecessors.size

/** Get the predecessors
*
* @return the current set of predecessors, if any
*/
def predecessors: List[GraphNode] = _predecessors.toList
def predecessors: Iterable[GraphNode] = _predecessors
}
30 changes: 11 additions & 19 deletions core/src/main/scala/dagr/core/execsystem/NaiveScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,15 @@ class NaiveScheduler extends Scheduler {
remainingSystemMemory: Memory,
remainingJvmMemory: Memory): Option[(UnitTask, ResourceSet)] = {
val systemResourceSet: ResourceSet = ResourceSet(remainingSystemCores, remainingSystemMemory)
val jvmResourceSet: ResourceSet = ResourceSet(remainingSystemCores, remainingJvmMemory)
val jvmResourceSet: ResourceSet = ResourceSet(remainingSystemCores, remainingJvmMemory)
// Find the first task that can be executed
readyTasks
.view // lazy
.map { // pick resources
case task: ProcessTask => (task, task.pickResources(systemResourceSet))
case task: InJvmTask => (task, task.pickResources(jvmResourceSet))
}
.find { // find the first that returned a resource set
case (_, Some(resourceSet)) => true
case _ => false
}
.map { // get the resource set
case (task, Some(resourceSet)) => (task, resourceSet)
case _ => throw new IllegalStateException("BUG")
.view
.flatMap { // pick resources
case task: ProcessTask => task.pickResources(systemResourceSet).map { rs => (task, rs) }
case task: InJvmTask => task.pickResources(jvmResourceSet).map { rs => (task, rs) }
}
.headOption
}

/** Runs one round of scheduling, trying to schedule as many ready tasks as possible given the
Expand All @@ -67,30 +60,29 @@ class NaiveScheduler extends Scheduler {
private def scheduleOnce(readyTasks: Iterable[UnitTask],
remainingSystemCores: Cores,
remainingSystemMemory: Memory,
remainingJvmMemory: Memory): List[(UnitTask, ResourceSet)] = {
remainingJvmMemory: Memory): Seq[(UnitTask, ResourceSet)] = {
// no more tasks ready to be scheduled
if (readyTasks.isEmpty) Nil
if (readyTasks.isEmpty) Seq.empty
else {
logger.debug(s"the resources were [System cores=" + remainingSystemCores.value
+ " System memory=" + Resource.parseBytesToSize(remainingSystemMemory.value)
+ " JVM memory=" + Resource.parseBytesToSize(remainingJvmMemory.value) + " ]")

// try one round of scheduling, and recurse if a task could be scheduled
scheduleOneTask(readyTasks, remainingSystemCores, remainingSystemMemory, remainingJvmMemory) match {
case None =>
Nil
case None => Seq.empty
case Some((task: UnitTask, resourceSet: ResourceSet)) =>
logger.debug("task to schedule is [" + task.name + "]")
logger.debug(s"task [${task.name}] uses the following resources [" + resourceSet + "]")
List[(UnitTask, ResourceSet)]((task, resourceSet)) ++ (task match {
case processTask: ProcessTask =>
case _: ProcessTask =>
scheduleOnce(
readyTasks = readyTasks.filterNot(t => t == task),
remainingSystemCores = remainingSystemCores - resourceSet.cores,
remainingSystemMemory = remainingSystemMemory - resourceSet.memory,
remainingJvmMemory = remainingJvmMemory
)
case inJvmTask: InJvmTask =>
case _: InJvmTask =>
scheduleOnce(
readyTasks = readyTasks.filterNot(t => t == task),
remainingSystemCores = remainingSystemCores - resourceSet.cores,
Expand Down
32 changes: 27 additions & 5 deletions core/src/main/scala/dagr/core/execsystem/ResourceSet.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,35 @@ case class ResourceSet(cores: Cores = Cores.none, memory: Memory = Memory.none)

/**
* Constructs a subset of this resource set with a fixed amount of memory and a variable
* number of cores. Will greedily assign the highest number of cores possible.
* number of cores. Will greedily assign the highest number of cores possible. If the maximum cores is a whole
* number, then a whole number will be returned, unless the minimum cores (which can be a fractional number of cores)
* is the only valid value. If the maximum cores is not a whole number, then the maximum fractional amount will be
* returned.
*
* Example 1: minCores=1, maxCores=5, this.cores=4.5, then 4 cores will be returned.
* Example 2: minCores=1, maxCores=5.1, this.cores=4.5, then 4.5 cores will be returned.
* Example 3: minCores=1, maxCores=5, this.cores=1.5, then 1 core will be returned.
* Example 4: minCores=1.5, maxCores=5, this.cores=1.5, then 1.5 cores will be returned.
*/
def subset(minCores: Cores, maxCores: Cores, memory: Memory) : Option[ResourceSet] = {
val min = minCores.value
val max = maxCores.value
val cores = Range.BigDecimal.inclusive(max, min, -1).find(cores => subset(Cores(cores.doubleValue), memory).isDefined)
cores.map(c => ResourceSet(Cores(c.doubleValue), memory))
if (!subsettable(ResourceSet(minCores, memory))) None else {
val coresValue = {
// Try to return a whole number value if maxCores is a whole number. If no whole number exists that is greater
// than or equal to minCores, then just use minCores (which could be fractional). If maxCores is fractional,
// then return a fractional value.
if (maxCores.value.isValidInt) {
// Get the number of cores, but rounded down to get a whole number value
val minValue = Math.floor(Math.min(this.cores.value, maxCores.value))
// If the number rounded down is smaller than the min-cores, then just return the min-cores
if (minValue < minCores.value) minCores.value else minValue
} else { // any fractional number will do
Math.min(this.cores.value, maxCores.value)
}
}
val resourceSet = ResourceSet(Cores(coresValue), memory)
require(subsettable(resourceSet))
Some(resourceSet)
}
}

/**
Expand Down
Loading

0 comments on commit 9313a7e

Please sign in to comment.