Skip to content

Commit 4567ffc

Browse files
committed
Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement2
2 parents 6a91b14 + f350cd3 commit 4567ffc

File tree

12 files changed

+179
-174
lines changed

12 files changed

+179
-174
lines changed

core/src/main/scala/org/apache/spark/FutureAction.scala

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ class ComplexFutureAction[T] extends FutureAction[T] {
208208
processPartition: Iterator[T] => U,
209209
partitions: Seq[Int],
210210
resultHandler: (Int, U) => Unit,
211-
resultFunc: => R): R = {
211+
resultFunc: => R) {
212212
// If the action hasn't been cancelled yet, submit the job. The check and the submitJob
213213
// command need to be in an atomic block.
214214
val job = this.synchronized {
@@ -223,10 +223,7 @@ class ComplexFutureAction[T] extends FutureAction[T] {
223223
// cancel the job and stop the execution. This is not in a synchronized block because
224224
// Await.ready eventually waits on the monitor in FutureJob.jobWaiter.
225225
try {
226-
Await.ready(job, Duration.Inf).value.get match {
227-
case scala.util.Failure(e) => throw e
228-
case scala.util.Success(v) => v
229-
}
226+
Await.ready(job, Duration.Inf)
230227
} catch {
231228
case e: InterruptedException =>
232229
job.cancel()

core/src/main/scala/org/apache/spark/Partitioner.scala

Lines changed: 6 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,6 @@ import org.apache.spark.serializer.JavaSerializer
2929
import org.apache.spark.util.{CollectionsUtils, Utils}
3030
import org.apache.spark.util.random.{XORShiftRandom, SamplingUtils}
3131

32-
import org.apache.spark.SparkContext.rddToAsyncRDDActions
33-
import scala.concurrent.Await
34-
import scala.concurrent.duration.Duration
35-
3632
/**
3733
* An object that defines how the elements in a key-value pair RDD are partitioned by key.
3834
* Maps each key to a partition ID, from 0 to `numPartitions - 1`.
@@ -117,12 +113,8 @@ class RangePartitioner[K : Ordering : ClassTag, V](
117113
private var ordering = implicitly[Ordering[K]]
118114

119115
// An array of upper bounds for the first (partitions - 1) partitions
120-
@volatile private var valRB: Array[K] = null
121-
122-
private def rangeBounds: Array[K] = this.synchronized {
123-
if (valRB != null) return valRB
124-
125-
valRB = if (partitions <= 1) {
116+
private var rangeBounds: Array[K] = {
117+
if (partitions <= 1) {
126118
Array.empty
127119
} else {
128120
// This is the sample size we need to have roughly balanced output partitions, capped at 1M.
@@ -160,8 +152,6 @@ class RangePartitioner[K : Ordering : ClassTag, V](
160152
RangePartitioner.determineBounds(candidates, partitions)
161153
}
162154
}
163-
164-
valRB
165155
}
166156

167157
def numPartitions = rangeBounds.length + 1
@@ -232,8 +222,7 @@ class RangePartitioner[K : Ordering : ClassTag, V](
232222
}
233223

234224
@throws(classOf[IOException])
235-
private def readObject(in: ObjectInputStream): Unit = this.synchronized {
236-
if (valRB != null) return
225+
private def readObject(in: ObjectInputStream) {
237226
val sfactory = SparkEnv.get.serializer
238227
sfactory match {
239228
case js: JavaSerializer => in.defaultReadObject()
@@ -245,7 +234,7 @@ class RangePartitioner[K : Ordering : ClassTag, V](
245234
val ser = sfactory.newInstance()
246235
Utils.deserializeViaNestedStream(in, ser) { ds =>
247236
implicit val classTag = ds.readObject[ClassTag[Array[K]]]()
248-
valRB = ds.readObject[Array[K]]()
237+
rangeBounds = ds.readObject[Array[K]]()
249238
}
250239
}
251240
}
@@ -265,18 +254,12 @@ private[spark] object RangePartitioner {
265254
sampleSizePerPartition: Int): (Long, Array[(Int, Int, Array[K])]) = {
266255
val shift = rdd.id
267256
// val classTagK = classTag[K] // to avoid serializing the entire partitioner object
268-
// use collectAsync here to run this job as a future, which is cancellable
269-
val sketchFuture = rdd.mapPartitionsWithIndex { (idx, iter) =>
257+
val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
270258
val seed = byteswap32(idx ^ (shift << 16))
271259
val (sample, n) = SamplingUtils.reservoirSampleAndCount(
272260
iter, sampleSizePerPartition, seed)
273261
Iterator((idx, n, sample))
274-
}.collectAsync()
275-
// We do need the future's value to continue any further
276-
val sketched = Await.ready(sketchFuture, Duration.Inf).value.get match {
277-
case scala.util.Success(v) => v.toArray
278-
case scala.util.Failure(e) => throw e
279-
}
262+
}.collect()
280263
val numItems = sketched.map(_._2.toLong).sum
281264
(numItems, sketched)
282265
}

core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala

Lines changed: 26 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import scala.collection.mutable.ArrayBuffer
2323
import scala.concurrent.ExecutionContext.Implicits.global
2424
import scala.reflect.ClassTag
2525

26-
import org.apache.spark.util.Utils
2726
import org.apache.spark.{ComplexFutureAction, FutureAction, Logging}
2827
import org.apache.spark.annotation.Experimental
2928

@@ -39,30 +38,29 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
3938
* Returns a future for counting the number of elements in the RDD.
4039
*/
4140
def countAsync(): FutureAction[Long] = {
42-
val f = new ComplexFutureAction[Long]
43-
f.run {
44-
val totalCount = new AtomicLong
45-
f.runJob(self,
46-
(iter: Iterator[T]) => Utils.getIteratorSize(iter),
47-
Range(0, self.partitions.size),
48-
(index: Int, data: Long) => totalCount.addAndGet(data),
49-
totalCount.get())
50-
}
41+
val totalCount = new AtomicLong
42+
self.context.submitJob(
43+
self,
44+
(iter: Iterator[T]) => {
45+
var result = 0L
46+
while (iter.hasNext) {
47+
result += 1L
48+
iter.next()
49+
}
50+
result
51+
},
52+
Range(0, self.partitions.size),
53+
(index: Int, data: Long) => totalCount.addAndGet(data),
54+
totalCount.get())
5155
}
5256

5357
/**
5458
* Returns a future for retrieving all elements of this RDD.
5559
*/
5660
def collectAsync(): FutureAction[Seq[T]] = {
57-
val f = new ComplexFutureAction[Seq[T]]
58-
f.run {
59-
val results = new Array[Array[T]](self.partitions.size)
60-
f.runJob(self,
61-
(iter: Iterator[T]) => iter.toArray,
62-
Range(0, self.partitions.size),
63-
(index: Int, data: Array[T]) => results(index) = data,
64-
results.flatten.toSeq)
65-
}
61+
val results = new Array[Array[T]](self.partitions.size)
62+
self.context.submitJob[T, Array[T], Seq[T]](self, _.toArray, Range(0, self.partitions.size),
63+
(index, data) => results(index) = data, results.flatten.toSeq)
6664
}
6765

6866
/**
@@ -106,34 +104,24 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
106104
}
107105
results.toSeq
108106
}
107+
108+
f
109109
}
110110

111111
/**
112112
* Applies a function f to all elements of this RDD.
113113
*/
114-
def foreachAsync(expr: T => Unit): FutureAction[Unit] = {
115-
val f = new ComplexFutureAction[Unit]
116-
val exprClean = self.context.clean(expr)
117-
f.run {
118-
f.runJob(self,
119-
(iter: Iterator[T]) => iter.foreach(exprClean),
120-
Range(0, self.partitions.size),
121-
(index: Int, data: Unit) => Unit,
122-
Unit)
123-
}
114+
def foreachAsync(f: T => Unit): FutureAction[Unit] = {
115+
val cleanF = self.context.clean(f)
116+
self.context.submitJob[T, Unit, Unit](self, _.foreach(cleanF), Range(0, self.partitions.size),
117+
(index, data) => Unit, Unit)
124118
}
125119

126120
/**
127121
* Applies a function f to each partition of this RDD.
128122
*/
129-
def foreachPartitionAsync(expr: Iterator[T] => Unit): FutureAction[Unit] = {
130-
val f = new ComplexFutureAction[Unit]
131-
f.run {
132-
f.runJob(self,
133-
expr,
134-
Range(0, self.partitions.size),
135-
(index: Int, data: Unit) => Unit,
136-
Unit)
137-
}
123+
def foreachPartitionAsync(f: Iterator[T] => Unit): FutureAction[Unit] = {
124+
self.context.submitJob[T, Unit, Unit](self, f, Range(0, self.partitions.size),
125+
(index, data) => Unit, Unit)
138126
}
139127
}

core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ class HadoopRDD[K, V](
194194
val jobConf = getJobConf()
195195
val inputFormat = getInputFormat(jobConf)
196196
HadoopRDD.addLocalConfiguration(new SimpleDateFormat("yyyyMMddHHmm").format(createTime),
197-
context.stageId, theSplit.index, context.attemptId.toInt, jobConf)
197+
context.getStageId, theSplit.index, context.getAttemptId.toInt, jobConf)
198198
reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
199199

200200
// Register an on-task-completion callback to close the input stream.

core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
8686
}
8787
val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
8888
if (self.partitioner == Some(partitioner)) {
89-
self.mapPartitionsWithContext((context, iter) => {
89+
self.mapPartitions(iter => {
90+
val context = TaskContext.get()
9091
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
9192
}, preservesPartitioning = true)
9293
} else {

core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala

Lines changed: 8 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -66,23 +66,19 @@ class ExternalAppendOnlyMap[K, V, C](
6666
mergeCombiners: (C, C) => C,
6767
serializer: Serializer = SparkEnv.get.serializer,
6868
blockManager: BlockManager = SparkEnv.get.blockManager)
69-
extends Iterable[(K, C)] with Serializable with Logging {
69+
extends Iterable[(K, C)]
70+
with Serializable
71+
with Logging
72+
with Spillable[SizeTracker] {
7073

7174
private var currentMap = new SizeTrackingAppendOnlyMap[K, C]
7275
private val spilledMaps = new ArrayBuffer[DiskMapIterator]
7376
private val sparkConf = SparkEnv.get.conf
7477
private val diskBlockManager = blockManager.diskBlockManager
75-
private val shuffleMemoryManager = SparkEnv.get.shuffleMemoryManager
7678

7779
// Number of pairs inserted since last spill; note that we count them even if a value is merged
7880
// with a previous key in case we're doing something like groupBy where the result grows
79-
private var elementsRead = 0L
80-
81-
// Number of in-memory pairs inserted before tracking the map's shuffle memory usage
82-
private val trackMemoryThreshold = 1000
83-
84-
// How much of the shared memory pool this collection has claimed
85-
private var myMemoryThreshold = 0L
81+
protected[this] var elementsRead = 0L
8682

8783
/**
8884
* Size of object batches when reading/writing from serializers.
@@ -95,11 +91,7 @@ class ExternalAppendOnlyMap[K, V, C](
9591
*/
9692
private val serializerBatchSize = sparkConf.getLong("spark.shuffle.spill.batchSize", 10000)
9793

98-
// How many times we have spilled so far
99-
private var spillCount = 0
100-
10194
// Number of bytes spilled in total
102-
private var _memoryBytesSpilled = 0L
10395
private var _diskBytesSpilled = 0L
10496

10597
private val fileBufferSize = sparkConf.getInt("spark.shuffle.file.buffer.kb", 32) * 1024
@@ -136,19 +128,8 @@ class ExternalAppendOnlyMap[K, V, C](
136128

137129
while (entries.hasNext) {
138130
curEntry = entries.next()
139-
if (elementsRead > trackMemoryThreshold && elementsRead % 32 == 0 &&
140-
currentMap.estimateSize() >= myMemoryThreshold)
141-
{
142-
// Claim up to double our current memory from the shuffle memory pool
143-
val currentMemory = currentMap.estimateSize()
144-
val amountToRequest = 2 * currentMemory - myMemoryThreshold
145-
val granted = shuffleMemoryManager.tryToAcquire(amountToRequest)
146-
myMemoryThreshold += granted
147-
if (myMemoryThreshold <= currentMemory) {
148-
// We were granted too little memory to grow further (either tryToAcquire returned 0,
149-
// or we already had more memory than myMemoryThreshold); spill the current collection
150-
spill(currentMemory) // Will also release memory back to ShuffleMemoryManager
151-
}
131+
if (maybeSpill(currentMap, currentMap.estimateSize())) {
132+
currentMap = new SizeTrackingAppendOnlyMap[K, C]
152133
}
153134
currentMap.changeValue(curEntry._1, update)
154135
elementsRead += 1
@@ -171,11 +152,7 @@ class ExternalAppendOnlyMap[K, V, C](
171152
/**
172153
* Sort the existing contents of the in-memory map and spill them to a temporary file on disk.
173154
*/
174-
private def spill(mapSize: Long): Unit = {
175-
spillCount += 1
176-
val threadId = Thread.currentThread().getId
177-
logInfo("Thread %d spilling in-memory map of %d MB to disk (%d time%s so far)"
178-
.format(threadId, mapSize / (1024 * 1024), spillCount, if (spillCount > 1) "s" else ""))
155+
override protected[this] def spill(collection: SizeTracker): Unit = {
179156
val (blockId, file) = diskBlockManager.createTempBlock()
180157
curWriteMetrics = new ShuffleWriteMetrics()
181158
var writer = blockManager.getDiskWriter(blockId, file, serializer, fileBufferSize,
@@ -231,18 +208,11 @@ class ExternalAppendOnlyMap[K, V, C](
231208
}
232209
}
233210

234-
currentMap = new SizeTrackingAppendOnlyMap[K, C]
235211
spilledMaps.append(new DiskMapIterator(file, blockId, batchSizes))
236212

237-
// Release our memory back to the shuffle pool so that other threads can grab it
238-
shuffleMemoryManager.release(myMemoryThreshold)
239-
myMemoryThreshold = 0L
240-
241213
elementsRead = 0
242-
_memoryBytesSpilled += mapSize
243214
}
244215

245-
def memoryBytesSpilled: Long = _memoryBytesSpilled
246216
def diskBytesSpilled: Long = _diskBytesSpilled
247217

248218
/**

0 commit comments

Comments
 (0)