Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ jobs:
run-scala-tests:
<<: *test-defaults
# project/CirclePlugin.scala does its own test splitting in SBT based on CIRCLE_NODE_INDEX, CIRCLE_NODE_TOTAL
parallelism: 12
parallelism: 9
# Spark runs a lot of tests in parallel, we need 16 GB of RAM for this
resource_class: xlarge
steps:
Expand Down
34 changes: 24 additions & 10 deletions project/CirclePlugin.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import sbt.Keys._
import sbt._
import sbt.plugins.JvmPlugin
import scalaz.Dequeue
import scalaz.Heap
import scalaz.Order

//noinspection ScalaStyle
object CirclePlugin extends AutoPlugin {
Expand Down Expand Up @@ -149,12 +151,19 @@ object CirclePlugin extends AutoPlugin {
val tests = Dequeue[(TestKey, Double)](
allTestsTimings.toIndexedSeq.sortBy { case (key, runTime) => (runTime, key) } : _*)

case class Group(tests: List[TestKey], runTime: Double)

implicit val groupOrder: Order[Group] = {
import scalaz.std.anyVal._
Order.orderBy(_.runTime)
}

@tailrec
def process(tests: Dequeue[(TestKey, Double)],
soFar: Double = 0d,
takeLeft: Boolean = true,
acc: List[TestKey] = Nil,
groups: List[List[TestKey]] = Nil): List[List[TestKey]] = {
groups: List[Group] = Nil): List[Group] = {

if (groups.size == totalNodes || tests.isEmpty) {
// Short circuit the logic if we've just completed the last group
Expand All @@ -163,12 +172,17 @@ object CirclePlugin extends AutoPlugin {
return if (tests.isEmpty) {
groups
} else {
// Fit all remaining tests in the last issued bucket.
val lastGroup +: restGroups = groups
val toFit = tests.toStream.map(_._1).force
log.info(s"Fitting remaining tests into first bucket (which already has " +
s"${lastGroup.size} tests): $toFit")
(toFit ++: lastGroup) :: restGroups
log.info(s"Fitting remaining tests into smallest buckets: $toFit")
// Fit all remaining tests into the least used buckets.
// import needed for creating Heap from List (needs Foldable[List[_]])
import scalaz.std.list._
tests.foldLeft(Heap.fromData(groups)) { case(heap, (test, runTime)) =>
heap.uncons match {
case Some((group, rest)) =>
rest.insert(group.copy(test :: group.tests, runTime + group.runTime))
}
}.toList
}
}

Expand All @@ -192,7 +206,7 @@ object CirclePlugin extends AutoPlugin {
case x@TestCandidate((_, runTime), _, _) if soFar + runTime <= timePerNode => x
} match {
case None =>
process(tests, 0d, takeLeft = true, Nil, acc :: groups)
process(tests, 0d, takeLeft = true, Nil, Group(acc, soFar) :: groups)
case Some(TestCandidate((key, runTime), rest, fromLeft)) =>
process(rest, soFar + runTime, fromLeft, key :: acc, groups)
}
Expand All @@ -202,11 +216,11 @@ object CirclePlugin extends AutoPlugin {
val rootTarget = (target in LocalRootProject).value
val bucketsFile = rootTarget / "tests-by-bucket.json"
log.info(s"Saving test distribution into $totalNodes buckets to: $bucketsFile")
mapper.writeValue(bucketsFile, buckets)
val timingsPerBucket = buckets.map(_.iterator.map(allTestsTimings.apply).sum)
mapper.writeValue(bucketsFile, buckets.map(_.tests))
val timingsPerBucket = buckets.map(_.tests.iterator.map(allTestsTimings.apply).sum)
log.info(s"Estimated test timings per bucket: $timingsPerBucket")

val bucket = buckets.lift.apply(index).getOrElse(Nil)
val bucket = buckets.map(_.tests).lift.apply(index).getOrElse(Nil)

val groupedByProject = bucket.flatMap(testsByKey.apply)
.groupBy(_.project)
Expand Down