Skip to content

Commit 3f160c2

Browse files
committed
Merge pull request #19 from markhamstra/master-csd
SKIPME Pickup recent bug fixes from Apache branch-0.9
2 parents 399d0c5 + 69d5f87 commit 3f160c2

File tree

20 files changed

+278
-52
lines changed

20 files changed

+278
-52
lines changed

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,12 @@ import java.nio.ByteBuffer
2222
import akka.actor._
2323
import akka.remote._
2424

25-
import org.apache.spark.{SparkConf, SparkContext, Logging}
25+
import org.apache.spark.{SparkConf, SparkContext, SparkEnv, Logging}
2626
import org.apache.spark.TaskState.TaskState
2727
import org.apache.spark.deploy.SparkHadoopUtil
2828
import org.apache.spark.deploy.worker.WorkerWatcher
2929
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
30+
import org.apache.spark.scheduler.TaskDescription
3031
import org.apache.spark.util.{AkkaUtils, Utils}
3132

3233
private[spark] class CoarseGrainedExecutorBackend(
@@ -60,12 +61,14 @@ private[spark] class CoarseGrainedExecutorBackend(
6061
logError("Slave registration failed: " + message)
6162
System.exit(1)
6263

63-
case LaunchTask(taskDesc) =>
64-
logInfo("Got assigned task " + taskDesc.taskId)
64+
case LaunchTask(data) =>
6565
if (executor == null) {
6666
logError("Received LaunchTask command but executor was null")
6767
System.exit(1)
6868
} else {
69+
val ser = SparkEnv.get.closureSerializer.newInstance()
70+
val taskDesc = ser.deserialize[TaskDescription](data.value)
71+
logInfo("Got assigned task " + taskDesc.taskId)
6972
executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)
7073
}
7174

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ private[spark] sealed trait CoarseGrainedClusterMessage extends Serializable
2929
private[spark] object CoarseGrainedClusterMessages {
3030

3131
// Driver to executors
32-
case class LaunchTask(task: TaskDescription) extends CoarseGrainedClusterMessage
32+
case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage
3333

3434
case class KillTask(taskId: Long, executor: String, interruptThread: Boolean)
3535
extends CoarseGrainedClusterMessage

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,12 @@ import akka.actor._
2727
import akka.pattern.ask
2828
import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
2929

30-
import org.apache.spark.{SparkException, Logging, TaskState}
30+
import org.apache.spark.{SparkException, SparkEnv, Logging, TaskState}
3131
import org.apache.spark.{Logging, SparkException, TaskState}
3232
import org.apache.spark.scheduler.{TaskSchedulerImpl, SchedulerBackend, SlaveLost, TaskDescription,
3333
WorkerOffer}
3434
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
35-
import org.apache.spark.util.{AkkaUtils, Utils}
35+
import org.apache.spark.util.{SerializableBuffer, AkkaUtils, Utils}
3636

3737
/**
3838
* A scheduler backend that waits for coarse grained executors to connect to it through Akka.
@@ -50,6 +50,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
5050
var totalCoreCount = new AtomicInteger(0)
5151
val conf = scheduler.sc.conf
5252
private val timeout = AkkaUtils.askTimeout(conf)
53+
private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
5354

5455
class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
5556
private val executorActor = new HashMap[String, ActorRef]
@@ -139,8 +140,26 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
139140
// Launch tasks returned by a set of resource offers
140141
def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
141142
for (task <- tasks.flatten) {
142-
freeCores(task.executorId) -= 1
143-
executorActor(task.executorId) ! LaunchTask(task)
143+
val ser = SparkEnv.get.closureSerializer.newInstance()
144+
val serializedTask = ser.serialize(task)
145+
if (serializedTask.limit >= akkaFrameSize - 1024) {
146+
val taskSetId = scheduler.taskIdToTaskSetId(task.taskId)
147+
scheduler.activeTaskSets.get(taskSetId).foreach { taskSet =>
148+
try {
149+
var msg = "Serialized task %s:%d was %d bytes which " +
150+
"exceeds spark.akka.frameSize (%d bytes). " +
151+
"Consider using broadcast variables for large values."
152+
msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize)
153+
taskSet.abort(msg)
154+
} catch {
155+
case e: Exception => logError("Exception in error callback", e)
156+
}
157+
}
158+
}
159+
else {
160+
freeCores(task.executorId) -= 1
161+
executorActor(task.executorId) ! LaunchTask(new SerializableBuffer(serializedTask))
162+
}
144163
}
145164
}
146165

core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.util.collection
2020
import java.io._
2121
import java.util.Comparator
2222

23+
import scala.collection.BufferedIterator
2324
import scala.collection.mutable
2425
import scala.collection.mutable.ArrayBuffer
2526

@@ -230,7 +231,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
230231
// Input streams are derived both from the in-memory map and spilled maps on disk
231232
// The in-memory map is sorted in place, while the spilled maps are already in sorted order
232233
private val sortedMap = currentMap.destructiveSortedIterator(comparator)
233-
private val inputStreams = Seq(sortedMap) ++ spilledMaps
234+
private val inputStreams = (Seq(sortedMap) ++ spilledMaps).map(it => it.buffered)
234235

235236
inputStreams.foreach { it =>
236237
val kcPairs = getMorePairs(it)
@@ -245,13 +246,13 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
245246
* In the event of key hash collisions, this ensures no pairs are hidden from being merged.
246247
* Assume the given iterator is in sorted order.
247248
*/
248-
private def getMorePairs(it: Iterator[(K, C)]): ArrayBuffer[(K, C)] = {
249+
private def getMorePairs(it: BufferedIterator[(K, C)]): ArrayBuffer[(K, C)] = {
249250
val kcPairs = new ArrayBuffer[(K, C)]
250251
if (it.hasNext) {
251252
var kc = it.next()
252253
kcPairs += kc
253254
val minHash = kc._1.hashCode()
254-
while (it.hasNext && kc._1.hashCode() == minHash) {
255+
while (it.hasNext && it.head._1.hashCode() == minHash) {
255256
kc = it.next()
256257
kcPairs += kc
257258
}
@@ -324,7 +325,8 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
324325
*
325326
* StreamBuffers are ordered by the minimum key hash found across all of their own pairs.
326327
*/
327-
private case class StreamBuffer(iterator: Iterator[(K, C)], pairs: ArrayBuffer[(K, C)])
328+
private class StreamBuffer(
329+
val iterator: BufferedIterator[(K, C)], val pairs: ArrayBuffer[(K, C)])
328330
extends Comparable[StreamBuffer] {
329331

330332
def isEmpty = pairs.length == 0
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.scheduler
19+
20+
import org.apache.spark.{LocalSparkContext, SparkConf, SparkException, SparkContext}
21+
import org.apache.spark.util.{SerializableBuffer, AkkaUtils}
22+
23+
import org.scalatest.FunSuite
24+
25+
class CoarseGrainedSchedulerBackendSuite extends FunSuite with LocalSparkContext {
26+
27+
test("serialized task larger than akka frame size") {
28+
val conf = new SparkConf
29+
conf.set("spark.akka.frameSize","1")
30+
conf.set("spark.default.parallelism","1")
31+
sc = new SparkContext("local-cluster[2 , 1 , 512]", "test", conf)
32+
val frameSize = AkkaUtils.maxFrameSizeBytes(sc.conf)
33+
val buffer = new SerializableBuffer(java.nio.ByteBuffer.allocate(2 * frameSize))
34+
val larger = sc.parallelize(Seq(buffer))
35+
val thrown = intercept[SparkException] {
36+
larger.collect()
37+
}
38+
assert(thrown.getMessage.contains("Consider using broadcast variables for large values"))
39+
val smaller = sc.parallelize(1 to 4).collect()
40+
assert(smaller.size === 4)
41+
}
42+
43+
}

core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,11 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
277277
("pomatoes", "eructation") // 568647356
278278
)
279279

280+
collisionPairs.foreach { case (w1, w2) =>
281+
// String.hashCode is documented to use a specific algorithm, but check just in case
282+
assert(w1.hashCode === w2.hashCode)
283+
}
284+
280285
(1 to 100000).map(_.toString).foreach { i => map.insert(i, i) }
281286
collisionPairs.foreach { case (w1, w2) =>
282287
map.insert(w1, w2)
@@ -296,7 +301,32 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
296301
assert(kv._2.equals(expectedValue))
297302
count += 1
298303
}
299-
assert(count == 100000 + collisionPairs.size * 2)
304+
assert(count === 100000 + collisionPairs.size * 2)
305+
}
306+
307+
test("spilling with many hash collisions") {
308+
val conf = new SparkConf(true)
309+
conf.set("spark.shuffle.memoryFraction", "0.0001")
310+
sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
311+
312+
val map = new ExternalAppendOnlyMap[FixedHashObject, Int, Int](_ => 1, _ + _, _ + _)
313+
314+
// Insert 10 copies each of lots of objects whose hash codes are either 0 or 1. This causes
315+
// problems if the map fails to group together the objects with the same code (SPARK-2043).
316+
for (i <- 1 to 10) {
317+
for (j <- 1 to 10000) {
318+
map.insert(FixedHashObject(j, j % 2), 1)
319+
}
320+
}
321+
322+
val it = map.iterator
323+
var count = 0
324+
while (it.hasNext) {
325+
val kv = it.next()
326+
assert(kv._2 === 10)
327+
count += 1
328+
}
329+
assert(count === 10000)
300330
}
301331

302332
test("spilling with hash collisions using the Int.MaxValue key") {
@@ -317,3 +347,10 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
317347
}
318348
}
319349
}
350+
351+
/**
352+
* A dummy class that always returns the same hash code, to easily test hash collisions
353+
*/
354+
case class FixedHashObject(val v: Int, val h: Int) extends Serializable {
355+
override def hashCode(): Int = h
356+
}

ec2/spark_ec2.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,12 @@ def get_spark_ami(opts):
194194
"c3.xlarge": "pvm",
195195
"c3.2xlarge": "pvm",
196196
"c3.4xlarge": "pvm",
197-
"c3.8xlarge": "pvm"
197+
"c3.8xlarge": "pvm",
198+
"r3.large": "hvm",
199+
"r3.xlarge": "hvm",
200+
"r3.2xlarge": "hvm",
201+
"r3.4xlarge": "hvm",
202+
"r3.8xlarge": "hvm"
198203
}
199204
if opts.instance_type in instance_types:
200205
instance_type = instance_types[opts.instance_type]
@@ -496,7 +501,12 @@ def get_num_disks(instance_type):
496501
"c3.xlarge": 2,
497502
"c3.2xlarge": 2,
498503
"c3.4xlarge": 2,
499-
"c3.8xlarge": 2
504+
"c3.8xlarge": 2,
505+
"r3.large": 1,
506+
"r3.xlarge": 1,
507+
"r3.2xlarge": 1,
508+
"r3.4xlarge": 1,
509+
"r3.8xlarge": 2
500510
}
501511
if instance_type in disks_by_instance:
502512
return disks_by_instance[instance_type]

external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,20 +61,20 @@ class SparkFlumeEvent() extends Externalizable {
6161
def readExternal(in: ObjectInput) {
6262
val bodyLength = in.readInt()
6363
val bodyBuff = new Array[Byte](bodyLength)
64-
in.read(bodyBuff)
64+
in.readFully(bodyBuff)
6565

6666
val numHeaders = in.readInt()
6767
val headers = new java.util.HashMap[CharSequence, CharSequence]
6868

6969
for (i <- 0 until numHeaders) {
7070
val keyLength = in.readInt()
7171
val keyBuff = new Array[Byte](keyLength)
72-
in.read(keyBuff)
72+
in.readFully(keyBuff)
7373
val key : String = Utils.deserialize(keyBuff)
7474

7575
val valLength = in.readInt()
7676
val valBuff = new Array[Byte](valLength)
77-
in.read(valBuff)
77+
in.readFully(valBuff)
7878
val value : String = Utils.deserialize(valBuff)
7979

8080
headers.put(key, value)

graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@ class EdgeRDD[@specialized ED: ClassTag](
4545
partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD)))
4646

4747
override def compute(part: Partition, context: TaskContext): Iterator[Edge[ED]] = {
48-
firstParent[(PartitionID, EdgePartition[ED])].iterator(part, context).next._2.iterator
48+
val p = firstParent[(PartitionID, EdgePartition[ED])].iterator(part, context)
49+
p.next._2.iterator.map(_.copy())
4950
}
5051

5152
override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect()

graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,9 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
5656
* Construct a new edge partition by applying the function f to all
5757
* edges in this partition.
5858
*
59+
* Be careful not to keep references to the objects passed to `f`.
60+
* To improve GC performance the same object is re-used for each call.
61+
*
5962
* @param f a function from an edge to a new attribute
6063
* @tparam ED2 the type of the new attribute
6164
* @return a new edge partition with the result of the function `f`
@@ -84,12 +87,12 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
8487
* order of the edges returned by `EdgePartition.iterator` and
8588
* should return attributes equal to the number of edges.
8689
*
87-
* @param f a function from an edge to a new attribute
90+
* @param iter an iterator for the new attribute values
8891
* @tparam ED2 the type of the new attribute
89-
* @return a new edge partition with the result of the function `f`
90-
* applied to each edge
92+
* @return a new edge partition with the attribute values replaced
9193
*/
9294
def map[ED2: ClassTag](iter: Iterator[ED2]): EdgePartition[ED2] = {
95+
// Faster than iter.toArray, because the expected size is known.
9396
val newData = new Array[ED2](data.size)
9497
var i = 0
9598
while (iter.hasNext) {
@@ -188,6 +191,9 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
188191
/**
189192
* Get an iterator over the edges in this partition.
190193
*
194+
* Be careful not to keep references to the objects from this iterator.
195+
* To improve GC performance the same object is re-used in `next()`.
196+
*
191197
* @return an iterator over edges in the partition
192198
*/
193199
def iterator = new Iterator[Edge[ED]] {
@@ -216,6 +222,9 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
216222
/**
217223
* Get an iterator over the cluster of edges in this partition with source vertex id `srcId`. The
218224
* cluster must start at position `index`.
225+
*
226+
* Be careful not to keep references to the objects from this iterator. To improve GC performance
227+
* the same object is re-used in `next()`.
219228
*/
220229
private def clusterIterator(srcId: VertexId, index: Int) = new Iterator[Edge[ED]] {
221230
private[this] val edge = new Edge[ED]

0 commit comments

Comments
 (0)