Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions core/benchmarks/CoalescedRDDBenchmark-results.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
================================================================================================
Coalesced RDD , large scale
================================================================================================

Java HotSpot(TM) 64-Bit Server VM 1.8.0_201-b09 on Windows 10 10.0
Intel64 Family 6 Model 63 Stepping 2, GenuineIntel
Coalesced RDD: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Coalesce Num Partitions: 100 Num Hosts: 1 346 364 24 0.3 3458.9 1.0X
Coalesce Num Partitions: 100 Num Hosts: 5 258 264 6 0.4 2579.0 1.3X
Coalesce Num Partitions: 100 Num Hosts: 10 242 249 7 0.4 2415.2 1.4X
Coalesce Num Partitions: 100 Num Hosts: 20 237 242 7 0.4 2371.7 1.5X
Coalesce Num Partitions: 100 Num Hosts: 40 230 231 1 0.4 2299.8 1.5X
Coalesce Num Partitions: 100 Num Hosts: 80 222 233 14 0.4 2223.0 1.6X
Coalesce Num Partitions: 500 Num Hosts: 1 659 665 5 0.2 6590.4 0.5X
Coalesce Num Partitions: 500 Num Hosts: 5 340 381 47 0.3 3395.2 1.0X
Coalesce Num Partitions: 500 Num Hosts: 10 279 307 47 0.4 2788.3 1.2X
Coalesce Num Partitions: 500 Num Hosts: 20 259 261 2 0.4 2591.9 1.3X
Coalesce Num Partitions: 500 Num Hosts: 40 241 250 15 0.4 2406.5 1.4X
Coalesce Num Partitions: 500 Num Hosts: 80 235 237 3 0.4 2349.9 1.5X
Coalesce Num Partitions: 1000 Num Hosts: 1 1050 1053 4 0.1 10503.2 0.3X
Coalesce Num Partitions: 1000 Num Hosts: 5 405 407 2 0.2 4049.5 0.9X
Coalesce Num Partitions: 1000 Num Hosts: 10 320 322 2 0.3 3202.7 1.1X
Coalesce Num Partitions: 1000 Num Hosts: 20 276 277 0 0.4 2762.3 1.3X
Coalesce Num Partitions: 1000 Num Hosts: 40 257 260 5 0.4 2571.2 1.3X
Coalesce Num Partitions: 1000 Num Hosts: 80 245 252 13 0.4 2448.9 1.4X
Coalesce Num Partitions: 5000 Num Hosts: 1 3099 3145 55 0.0 30988.6 0.1X
Coalesce Num Partitions: 5000 Num Hosts: 5 1037 1050 20 0.1 10374.4 0.3X
Coalesce Num Partitions: 5000 Num Hosts: 10 626 633 8 0.2 6261.8 0.6X
Coalesce Num Partitions: 5000 Num Hosts: 20 426 431 5 0.2 4258.6 0.8X
Coalesce Num Partitions: 5000 Num Hosts: 40 328 341 22 0.3 3275.4 1.1X
Coalesce Num Partitions: 5000 Num Hosts: 80 272 275 4 0.4 2721.4 1.3X
Coalesce Num Partitions: 10000 Num Hosts: 1 5516 5526 9 0.0 55156.8 0.1X
Coalesce Num Partitions: 10000 Num Hosts: 5 1956 1992 48 0.1 19560.9 0.2X
Coalesce Num Partitions: 10000 Num Hosts: 10 1045 1057 18 0.1 10447.4 0.3X
Coalesce Num Partitions: 10000 Num Hosts: 20 637 658 24 0.2 6373.2 0.5X
Coalesce Num Partitions: 10000 Num Hosts: 40 431 448 15 0.2 4312.9 0.8X
Coalesce Num Partitions: 10000 Num Hosts: 80 326 328 2 0.3 3263.4 1.1X


34 changes: 17 additions & 17 deletions core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ private[spark] case class CoalescedRDDPartition(
val parentPreferredLocations = rdd.context.getPreferredLocs(rdd, p.index).map(_.host)
preferredLocation.exists(parentPreferredLocations.contains)
}
if (parents.size == 0) 0.0 else (loc.toDouble / parents.size.toDouble)
if (parents.isEmpty) 0.0 else loc.toDouble / parents.size.toDouble
}
}

Expand Down Expand Up @@ -91,7 +91,7 @@ private[spark] class CoalescedRDD[T: ClassTag](
pc.coalesce(maxPartitions, prev).zipWithIndex.map {
case (pg, i) =>
val ids = pg.partitions.map(_.index).toArray
new CoalescedRDDPartition(i, prev, ids, pg.prefLoc)
CoalescedRDDPartition(i, prev, ids, pg.prefLoc)
}
}

Expand All @@ -116,7 +116,7 @@ private[spark] class CoalescedRDD[T: ClassTag](
/**
* Returns the preferred machine for the partition. If split is of type CoalescedRDDPartition,
* then the preferred machine will be one which most parent splits prefer too.
* @param partition
* @param partition the partition for which to retrieve the preferred machine, if exists
* @return the machine most preferred by split
*/
override def getPreferredLocations(partition: Partition): Seq[String] = {
Expand Down Expand Up @@ -156,9 +156,11 @@ private[spark] class CoalescedRDD[T: ClassTag](

private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10)
extends PartitionCoalescer {
def compare(o1: PartitionGroup, o2: PartitionGroup): Boolean = o1.numPartitions < o2.numPartitions
def compare(o1: Option[PartitionGroup], o2: Option[PartitionGroup]): Boolean =
if (o1 == None) false else if (o2 == None) true else compare(o1.get, o2.get)

implicit val partitionGroupOrdering: Ordering[PartitionGroup] =
(o1: PartitionGroup, o2: PartitionGroup) =>
java.lang.Integer.compare(o1.numPartitions, o2.numPartitions)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, All.
This seems to break scala-2.11 build.

[error] ../core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala:161: type mismatch;
[error]  found   : (org.apache.spark.rdd.PartitionGroup, org.apache.spark.rdd.PartitionGroup) => Int
[error]  required: Ordering[org.apache.spark.rdd.PartitionGroup]
[error]     (o1: PartitionGroup, o2: PartitionGroup) =>
[error]          

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. That’s unfortunate. I’ll fix it later tonight

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then, let me revert this first to recover 2.11 build for the other PRs. Since this PR is already approved, I believe that the next PR will be easily accepted, @fitermay .

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun @srowen: Would it be a good idea to extend the PR builder to run a compile with scala 2.11 (without any test run)?

I know it is an extra 10-15 minutes but for the 4 hours test run it might be worth preventing such situations on the other hand this must be very rare. What is your opinion?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with you @attilapiros . But, IIRC, there was a discussion on that issue and the decision at that time was the current cost is not high enough for that.
The committers have a responsibility to monitor their commit. And, we usually are able to do HOTFIX or revert in a short time.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, thanks.
Yes it must be very rare.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're going to drop 2.11 support soonish anyway, so I think for now we accept the occasional breaks and fix after the fact rather than double the PR builders.


val rnd = new scala.util.Random(7919) // keep this class deterministic

Expand All @@ -178,7 +180,7 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10)
prev.context.getPreferredLocs(prev, part.index).map(tl => tl.host)
}

class PartitionLocations(prev: RDD[_]) {
private class PartitionLocations(prev: RDD[_]) {

// contains all the partitions from the previous RDD that don't have preferred locations
val partsWithoutLocs = ArrayBuffer[Partition]()
Expand Down Expand Up @@ -213,15 +215,14 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10)
}

/**
* Sorts and gets the least element of the list associated with key in groupHash
* Gets the least element of the list associated with key in groupHash
* The returned PartitionGroup is the least loaded of all groups that represent the machine "key"
*
* @param key string representing a partitioned group on preferred machine key
* @return Option of [[PartitionGroup]] that has least elements for key
*/
def getLeastGroupHash(key: String): Option[PartitionGroup] = {
groupHash.get(key).map(_.sortWith(compare).head)
}
def getLeastGroupHash(key: String): Option[PartitionGroup] =
groupHash.get(key).filter(_.nonEmpty).map(_.min)

def addPartToPGroup(part: Partition, pgroup: PartitionGroup): Boolean = {
if (!initialHash.contains(part)) {
Expand All @@ -236,12 +237,12 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10)
* is assigned a preferredLocation. This uses coupon collector to estimate how many
* preferredLocations it must rotate through until it has seen most of the preferred
* locations (2 * n log(n))
* @param targetLen
* @param targetLen The number of desired partition groups
*/
def setupGroups(targetLen: Int, partitionLocs: PartitionLocations) {
// deal with empty case, just create targetLen partition groups with no preferred location
if (partitionLocs.partsWithLocs.isEmpty) {
(1 to targetLen).foreach(x => groupArr += new PartitionGroup())
(1 to targetLen).foreach(_ => groupArr += new PartitionGroup())
return
}

Expand Down Expand Up @@ -297,9 +298,8 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10)
partitionLocs: PartitionLocations): PartitionGroup = {
val slack = (balanceSlack * prev.partitions.length).toInt
// least loaded pref locs
val pref = currPrefLocs(p, prev).map(getLeastGroupHash(_)).sortWith(compare)
val prefPart = if (pref == Nil) None else pref.head

val pref = currPrefLocs(p, prev).flatMap(getLeastGroupHash)
val prefPart = if (pref.isEmpty) None else Some(pref.min)
Comment thread
srowen marked this conversation as resolved.
Outdated
val r1 = rnd.nextInt(groupArr.size)
val r2 = rnd.nextInt(groupArr.size)
val minPowerOfTwo = {
Expand Down Expand Up @@ -351,7 +351,7 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10)
val partIter = partitionLocs.partsWithLocs.iterator
groupArr.filter(pg => pg.numPartitions == 0).foreach { pg =>
while (partIter.hasNext && pg.numPartitions == 0) {
var (nxt_replica, nxt_part) = partIter.next()
var (_, nxt_part) = partIter.next()
if (!initialHash.contains(nxt_part)) {
pg.partitions += nxt_part
initialHash += nxt_part
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.rdd

import scala.collection.immutable
Comment thread
fitermay marked this conversation as resolved.

import org.apache.spark.SparkContext
import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}

/**
* Benchmark for CoalescedRDD.
* Measures rdd.coalesce performance under various combinations of
* coalesced partitions and preferred hosts
* To run this benchmark:
* {{{
* 1. without sbt:
* bin/spark-submit --class <this class> --jars <spark core test jar>
* 2. build/sbt "core/test:runMain <this class>"
* 3. generate result:
* SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "core/test:runMain <this class>"
* Results will be written to "benchmarks/CoalescedRDD-results.txt".
* }}}
* */
object CoalescedRDDBenchmark extends BenchmarkBase {
Comment thread
fitermay marked this conversation as resolved.
val seed = 0x1337
val sc = new SparkContext(master = "local[4]", appName = "test")

private def coalescedRDD(numIters: Int): Unit = {
val numBlocks = 100000
val benchmark = new Benchmark("Coalesced RDD", numBlocks, output = output)
for (numPartitions <- Seq(100, 500, 1000, 5000, 10000)) {
for (numHosts <- Seq(1, 5, 10, 20, 40, 80)) {

import collection.mutable
val hosts = mutable.ArrayBuffer[String]()
(1 to numHosts).foreach(hosts += "m" + _)
hosts.length
val rnd = scala.util.Random
rnd.setSeed(seed)
val blocks: immutable.Seq[(Int, Seq[String])] = (1 to numBlocks).map { i =>
(i, hosts(rnd.nextInt(hosts.size)) :: Nil)
}

benchmark.addCase(
s"Coalesce Num Partitions: $numPartitions Num Hosts: $numHosts",
numIters) { _ =>
performCoalesce(blocks, numPartitions)
}
}
}

benchmark.run()
}

private def performCoalesce(blocks: immutable.Seq[(Int, Seq[String])], numPartitions: Int) {
sc.makeRDD(blocks).coalesce(numPartitions).partitions
}

override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
val numIters = 3
runBenchmark("Coalesced RDD , large scale") {
coalescedRDD(numIters)
}
Comment thread
fitermay marked this conversation as resolved.
}
}