Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 49 additions & 50 deletions core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -70,23 +70,23 @@ private[spark] case class CoalescedRDDPartition(
* parent partitions
* @param prev RDD to be coalesced
* @param maxPartitions number of desired partitions in the coalesced RDD (must be positive)
* @param balanceSlack used to trade-off balance and locality. 1.0 is all locality, 0 is all balance
* @param partitionCoalescer [[PartitionCoalescer]] implementation to use for coalescing
*/
private[spark] class CoalescedRDD[T: ClassTag](
@transient var prev: RDD[T],
maxPartitions: Int,
balanceSlack: Double = 0.10)
partitionCoalescer: Option[PartitionCoalescer] = None)
extends RDD[T](prev.context, Nil) { // Nil since we implement getDependencies

require(maxPartitions > 0 || maxPartitions == prev.partitions.length,
s"Number of partitions ($maxPartitions) must be positive.")

override def getPartitions: Array[Partition] = {
val pc = new PartitionCoalescer(maxPartitions, prev, balanceSlack)
val pc = partitionCoalescer.getOrElse(new DefaultPartitionCoalescer())

pc.run().zipWithIndex.map {
pc.coalesce(maxPartitions, prev).zipWithIndex.map {
case (pg, i) =>
val ids = pg.arr.map(_.index).toArray
val ids = pg.partitions.map(_.index).toArray
new CoalescedRDDPartition(i, prev, ids, pg.prefLoc)
}
}
Expand Down Expand Up @@ -144,15 +144,15 @@ private[spark] class CoalescedRDD[T: ClassTag](
* desired partitions is greater than the number of preferred machines (can happen), it needs to
* start picking duplicate preferred machines. This is determined using coupon collector estimation
* (2n log(n)). The load balancing is done using power-of-two randomized bins-balls with one twist:
* it tries to also achieve locality. This is done by allowing a slack (balanceSlack) between two
* bins. If two bins are within the slack in terms of balance, the algorithm will assign partitions
* according to locality. (contact alig for questions)
*
* it tries to also achieve locality. This is done by allowing a slack (balanceSlack, where
* 1.0 is all locality, 0 is all balance) between two bins. If two bins are within the slack
* in terms of balance, the algorithm will assign partitions according to locality.
* (contact alig for questions)
*/

private class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double) {

def compare(o1: PartitionGroup, o2: PartitionGroup): Boolean = o1.size < o2.size
private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10)
extends PartitionCoalescer {
def compare(o1: PartitionGroup, o2: PartitionGroup): Boolean = o1.numPartitions < o2.numPartitions
def compare(o1: Option[PartitionGroup], o2: Option[PartitionGroup]): Boolean =
if (o1 == None) false else if (o2 == None) true else compare(o1.get, o2.get)

Expand All @@ -167,14 +167,10 @@ private class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack:
// hash used for the first maxPartitions (to avoid duplicates)
val initialHash = mutable.Set[Partition]()

// determines the tradeoff between load-balancing the partitions sizes and their locality
// e.g. balanceSlack=0.10 means that it allows up to 10% imbalance in favor of locality
val slack = (balanceSlack * prev.partitions.length).toInt

var noLocality = true // if true if no preferredLocations exists for parent RDD

// gets the *current* preferred locations from the DAGScheduler (as opposed to the static ones)
def currPrefLocs(part: Partition): Seq[String] = {
def currPrefLocs(part: Partition, prev: RDD[_]): Seq[String] = {
prev.context.getPreferredLocs(prev, part.index).map(tl => tl.host)
}

Expand All @@ -192,7 +188,7 @@ private class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack:
def resetIterator(): Iterator[(String, Partition)] = {
val iterators = (0 to 2).map { x =>
prev.partitions.iterator.flatMap { p =>
if (currPrefLocs(p).size > x) Some((currPrefLocs(p)(x), p)) else None
if (currPrefLocs(p, prev).size > x) Some((currPrefLocs(p, prev)(x), p)) else None
}
}
iterators.reduceLeft((x, y) => x ++ y)
Expand All @@ -215,16 +211,17 @@ private class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack:
/**
* Sorts and gets the least element of the list associated with key in groupHash
* The returned PartitionGroup is the least loaded of all groups that represent the machine "key"
*
* @param key string representing a partitioned group on preferred machine key
* @return Option of PartitionGroup that has least elements for key
* @return Option of [[PartitionGroup]] that has least elements for key
*/
def getLeastGroupHash(key: String): Option[PartitionGroup] = {
groupHash.get(key).map(_.sortWith(compare).head)
}

def addPartToPGroup(part: Partition, pgroup: PartitionGroup): Boolean = {
if (!initialHash.contains(part)) {
pgroup.arr += part // already assign this element
pgroup.partitions += part // already assign this element
initialHash += part // needed to avoid assigning partitions to multiple buckets
true
} else { false }
Expand All @@ -236,12 +233,12 @@ private class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack:
* until it has seen most of the preferred locations (2 * n log(n))
* @param targetLen
*/
def setupGroups(targetLen: Int) {
def setupGroups(targetLen: Int, prev: RDD[_]) {
val rotIt = new LocationIterator(prev)

// deal with empty case, just create targetLen partition groups with no preferred location
if (!rotIt.hasNext) {
(1 to targetLen).foreach(x => groupArr += PartitionGroup())
(1 to targetLen).foreach(x => groupArr += new PartitionGroup())
return
}

Expand All @@ -259,7 +256,7 @@ private class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack:
tries += 1
val (nxt_replica, nxt_part) = rotIt.next()
if (!groupHash.contains(nxt_replica)) {
val pgroup = PartitionGroup(nxt_replica)
val pgroup = new PartitionGroup(Some(nxt_replica))
groupArr += pgroup
addPartToPGroup(nxt_part, pgroup)
groupHash.put(nxt_replica, ArrayBuffer(pgroup)) // list in case we have multiple
Expand All @@ -269,7 +266,7 @@ private class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack:

while (numCreated < targetLen) { // if we don't have enough partition groups, create duplicates
var (nxt_replica, nxt_part) = rotIt.next()
val pgroup = PartitionGroup(nxt_replica)
val pgroup = new PartitionGroup(Some(nxt_replica))
groupArr += pgroup
groupHash.getOrElseUpdate(nxt_replica, ArrayBuffer()) += pgroup
var tries = 0
Expand All @@ -285,73 +282,75 @@ private class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack:
/**
* Takes a parent RDD partition and decides which of the partition groups to put it in
* Takes locality into account, but also uses power of 2 choices to load balance
* It strikes a balance between the two use the balanceSlack variable
* It strikes a balance between the two using the balanceSlack variable
* @param p partition (ball to be thrown)
* @param balanceSlack determines the trade-off between load-balancing the partitions sizes and
* their locality. e.g., balanceSlack=0.10 means that it allows up to 10%
* imbalance in favor of locality
* @return partition group (bin to be put in)
*/
def pickBin(p: Partition): PartitionGroup = {
val pref = currPrefLocs(p).map(getLeastGroupHash(_)).sortWith(compare) // least loaded pref locs
def pickBin(p: Partition, prev: RDD[_], balanceSlack: Double): PartitionGroup = {
val slack = (balanceSlack * prev.partitions.length).toInt
// least loaded pref locs
val pref = currPrefLocs(p, prev).map(getLeastGroupHash(_)).sortWith(compare)
val prefPart = if (pref == Nil) None else pref.head

val r1 = rnd.nextInt(groupArr.size)
val r2 = rnd.nextInt(groupArr.size)
val minPowerOfTwo = if (groupArr(r1).size < groupArr(r2).size) groupArr(r1) else groupArr(r2)
val minPowerOfTwo = {
if (groupArr(r1).numPartitions < groupArr(r2).numPartitions) {
groupArr(r1)
}
else {
groupArr(r2)
}
}
if (prefPart.isEmpty) {
// if no preferred locations, just use basic power of two
return minPowerOfTwo
}

val prefPartActual = prefPart.get

if (minPowerOfTwo.size + slack <= prefPartActual.size) { // more imbalance than the slack allows
// more imbalance than the slack allows
if (minPowerOfTwo.numPartitions + slack <= prefPartActual.numPartitions) {
minPowerOfTwo // prefer balance over locality
} else {
prefPartActual // prefer locality over balance
}
}

def throwBalls() {
def throwBalls(maxPartitions: Int, prev: RDD[_], balanceSlack: Double) {
if (noLocality) { // no preferredLocations in parent RDD, no randomization needed
if (maxPartitions > groupArr.size) { // just return prev.partitions
for ((p, i) <- prev.partitions.zipWithIndex) {
groupArr(i).arr += p
groupArr(i).partitions += p
}
} else { // no locality available, then simply split partitions based on positions in array
for (i <- 0 until maxPartitions) {
val rangeStart = ((i.toLong * prev.partitions.length) / maxPartitions).toInt
val rangeEnd = (((i.toLong + 1) * prev.partitions.length) / maxPartitions).toInt
(rangeStart until rangeEnd).foreach{ j => groupArr(i).arr += prev.partitions(j) }
(rangeStart until rangeEnd).foreach{ j => groupArr(i).partitions += prev.partitions(j) }
}
}
} else {
for (p <- prev.partitions if (!initialHash.contains(p))) { // throw every partition into group
pickBin(p).arr += p
pickBin(p, prev, balanceSlack).partitions += p
}
}
}

def getPartitions: Array[PartitionGroup] = groupArr.filter( pg => pg.size > 0).toArray
def getPartitions: Array[PartitionGroup] = groupArr.filter( pg => pg.numPartitions > 0).toArray

/**
* Runs the packing algorithm and returns an array of PartitionGroups that if possible are
* load balanced and grouped by locality
* @return array of partition groups
*
* @return array of partition groups
*/
def run(): Array[PartitionGroup] = {
setupGroups(math.min(prev.partitions.length, maxPartitions)) // setup the groups (bins)
throwBalls() // assign partitions (balls) to each group (bins)
def coalesce(maxPartitions: Int, prev: RDD[_]): Array[PartitionGroup] = {
setupGroups(math.min(prev.partitions.length, maxPartitions), prev) // setup the groups (bins)
throwBalls(maxPartitions, prev, balanceSlack) // assign partitions (balls) to each group (bins)
getPartitions
}
}

private case class PartitionGroup(prefLoc: Option[String] = None) {
var arr = mutable.ArrayBuffer[Partition]()
def size: Int = arr.size
}

private object PartitionGroup {
def apply(prefLoc: String): PartitionGroup = {
require(prefLoc != "", "Preferred location must not be empty")
PartitionGroup(Some(prefLoc))
}
}
9 changes: 6 additions & 3 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,9 @@ abstract class RDD[T: ClassTag](
* coalesce(1000, shuffle = true) will result in 1000 partitions with the
* data distributed using a hash partitioner.
*/
def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null)
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T] = withScope {
if (shuffle) {
/** Distributes elements evenly across output partitions, starting from a random partition. */
Expand All @@ -451,9 +453,10 @@ abstract class RDD[T: ClassTag](
new CoalescedRDD(
new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),
new HashPartitioner(numPartitions)),
numPartitions).values
numPartitions,
partitionCoalescer).values
} else {
new CoalescedRDD(this, numPartitions)
new CoalescedRDD(this, numPartitions, partitionCoalescer)
}
}

Expand Down
52 changes: 52 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/coalesce-public.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.rdd

import scala.collection.mutable

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.Partition

/**
* ::DeveloperApi::
* A PartitionCoalescer defines how to coalesce the partitions of a given RDD.
*/
@DeveloperApi
trait PartitionCoalescer {

/**
* Coalesce the partitions of the given RDD.
*
* @param maxPartitions the maximum number of partitions to have after coalescing
* @param parent the parent RDD whose partitions to coalesce
* @return an array of [[PartitionGroup]]s, where each element is itself an array of
* [[Partition]]s and represents a partition after coalescing is performed.
*/
def coalesce(maxPartitions: Int, parent: RDD[_]): Array[PartitionGroup]
}

/**
* ::DeveloperApi::
* A group of [[Partition]]s
* @param prefLoc preferred location for the partition group
*/
@DeveloperApi
class PartitionGroup(val prefLoc: Option[String] = None) {
val partitions = mutable.ArrayBuffer[Partition]()
def numPartitions: Int = partitions.size
}
Loading