Skip to content

Commit 4d7bae2

Browse files
committed
Merge branch 'master' into issues/SPARK-3063
2 parents 9321379 + c77f406 commit 4d7bae2

File tree

63 files changed

+1343
-1097
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

63 files changed

+1343
-1097
lines changed

core/src/main/scala/org/apache/spark/ContextCleaner.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,10 +66,15 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
6666

6767
/**
6868
* Whether the cleaning thread will block on cleanup tasks.
69-
* This is set to true only for tests.
69+
*
70+
* Due to SPARK-3015, this is set to true by default. This is intended to be only a temporary
71+
* workaround for the issue, which is ultimately caused by the way the BlockManager actors
72+
* issue inter-dependent blocking Akka messages to each other at high frequencies. This happens,
73+
* for instance, when the driver performs a GC and cleans up all broadcast blocks that are no
74+
* longer in scope.
7075
*/
7176
private val blockOnCleanupTasks = sc.conf.getBoolean(
72-
"spark.cleaner.referenceTracking.blocking", false)
77+
"spark.cleaner.referenceTracking.blocking", true)
7378

7479
@volatile private var stopped = false
7580

@@ -174,9 +179,6 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
174179
private def blockManagerMaster = sc.env.blockManager.master
175180
private def broadcastManager = sc.env.broadcastManager
176181
private def mapOutputTrackerMaster = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
177-
178-
// Used for testing. These methods explicitly blocks until cleanup is completed
179-
// to ensure that more reliable testing.
180182
}
181183

182184
private object ContextCleaner {

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -210,12 +210,22 @@ object SparkEnv extends Logging {
210210
"MapOutputTracker",
211211
new MapOutputTrackerMasterActor(mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))
212212

213+
// Let the user specify short names for shuffle managers
214+
val shortShuffleMgrNames = Map(
215+
"hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
216+
"sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")
217+
val shuffleMgrName = conf.get("spark.shuffle.manager", "hash")
218+
val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
219+
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
220+
221+
val shuffleMemoryManager = new ShuffleMemoryManager(conf)
222+
213223
val blockManagerMaster = new BlockManagerMaster(registerOrLookup(
214224
"BlockManagerMaster",
215225
new BlockManagerMasterActor(isLocal, conf, listenerBus)), conf)
216226

217227
val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster,
218-
serializer, conf, securityManager, mapOutputTracker)
228+
serializer, conf, securityManager, mapOutputTracker, shuffleManager)
219229

220230
val connectionManager = blockManager.connectionManager
221231

@@ -250,16 +260,6 @@ object SparkEnv extends Logging {
250260
"."
251261
}
252262

253-
// Let the user specify short names for shuffle managers
254-
val shortShuffleMgrNames = Map(
255-
"hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
256-
"sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")
257-
val shuffleMgrName = conf.get("spark.shuffle.manager", "hash")
258-
val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
259-
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
260-
261-
val shuffleMemoryManager = new ShuffleMemoryManager(conf)
262-
263263
// Warn about deprecated spark.cache.class property
264264
if (conf.contains("spark.cache.class")) {
265265
logWarning("The spark.cache.class property is no longer being used! Specify storage " +

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,14 @@ private[spark] object PythonRDD extends Logging {
315315
JavaRDD.fromRDD(sc.sc.parallelize(objs, parallelism))
316316
}
317317

318+
def readBroadcastFromFile(sc: JavaSparkContext, filename: String): Broadcast[Array[Byte]] = {
319+
val file = new DataInputStream(new FileInputStream(filename))
320+
val length = file.readInt()
321+
val obj = new Array[Byte](length)
322+
file.readFully(obj)
323+
sc.broadcast(obj)
324+
}
325+
318326
def writeIteratorToStream[T](iter: Iterator[T], dataOut: DataOutputStream) {
319327
// The right way to implement this would be to use TypeTags to get the full
320328
// type of T. Since I don't want to introduce breaking changes throughout the

core/src/main/scala/org/apache/spark/network/ConnectionManager.scala

Lines changed: 36 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import java.nio._
2222
import java.nio.channels._
2323
import java.nio.channels.spi._
2424
import java.net._
25+
import java.util.{Timer, TimerTask}
2526
import java.util.concurrent.atomic.AtomicInteger
2627

2728
import java.util.concurrent.{LinkedBlockingDeque, TimeUnit, ThreadPoolExecutor}
@@ -61,17 +62,17 @@ private[spark] class ConnectionManager(
6162
var ackMessage: Option[Message] = None
6263

6364
def markDone(ackMessage: Option[Message]) {
64-
this.synchronized {
65-
this.ackMessage = ackMessage
66-
completionHandler(this)
67-
}
65+
this.ackMessage = ackMessage
66+
completionHandler(this)
6867
}
6968
}
7069

7170
private val selector = SelectorProvider.provider.openSelector()
71+
private val ackTimeoutMonitor = new Timer("AckTimeoutMonitor", true)
7272

7373
// default to 30 second timeout waiting for authentication
7474
private val authTimeout = conf.getInt("spark.core.connection.auth.wait.timeout", 30)
75+
private val ackTimeout = conf.getInt("spark.core.connection.ack.wait.timeout", 60)
7576

7677
private val handleMessageExecutor = new ThreadPoolExecutor(
7778
conf.getInt("spark.core.connection.handler.threads.min", 20),
@@ -652,19 +653,27 @@ private[spark] class ConnectionManager(
652653
}
653654
}
654655
if (bufferMessage.hasAckId()) {
655-
val sentMessageStatus = messageStatuses.synchronized {
656+
messageStatuses.synchronized {
656657
messageStatuses.get(bufferMessage.ackId) match {
657658
case Some(status) => {
658659
messageStatuses -= bufferMessage.ackId
659-
status
660+
status.markDone(Some(message))
660661
}
661662
case None => {
662-
throw new Exception("Could not find reference for received ack message " +
663-
message.id)
663+
/**
664+
* We can fall down on this code because of following 2 cases
665+
*
666+
* (1) Invalid ack sent due to buggy code.
667+
*
668+
* (2) Late-arriving ack for a SendMessageStatus
669+
* To avoid unwilling late-arriving ack
670+
* caused by long pause like GC, you can set
671+
* larger value than default to spark.core.connection.ack.wait.timeout
672+
*/
673+
logWarning(s"Could not find reference for received ack Message ${message.id}")
664674
}
665675
}
666676
}
667-
sentMessageStatus.markDone(Some(message))
668677
} else {
669678
var ackMessage : Option[Message] = None
670679
try {
@@ -836,9 +845,23 @@ private[spark] class ConnectionManager(
836845
def sendMessageReliably(connectionManagerId: ConnectionManagerId, message: Message)
837846
: Future[Message] = {
838847
val promise = Promise[Message]()
848+
849+
val timeoutTask = new TimerTask {
850+
override def run(): Unit = {
851+
messageStatuses.synchronized {
852+
messageStatuses.remove(message.id).foreach ( s => {
853+
promise.failure(
854+
new IOException(s"sendMessageReliably failed because ack " +
855+
"was not received within ${ackTimeout} sec"))
856+
})
857+
}
858+
}
859+
}
860+
839861
val status = new MessageStatus(message, connectionManagerId, s => {
862+
timeoutTask.cancel()
840863
s.ackMessage match {
841-
case None => // Indicates a failure where we either never sent or never got ACK'd
864+
case None => // Indicates a failure where we either never sent or never got ACK'd
842865
promise.failure(new IOException("sendMessageReliably failed without being ACK'd"))
843866
case Some(ackMessage) =>
844867
if (ackMessage.hasError) {
@@ -852,6 +875,8 @@ private[spark] class ConnectionManager(
852875
messageStatuses.synchronized {
853876
messageStatuses += ((message.id, status))
854877
}
878+
879+
ackTimeoutMonitor.schedule(timeoutTask, ackTimeout * 1000)
855880
sendMessage(connectionManagerId, message)
856881
promise.future
857882
}
@@ -861,6 +886,7 @@ private[spark] class ConnectionManager(
861886
}
862887

863888
def stop() {
889+
ackTimeoutMonitor.cancel()
864890
selectorThread.interrupt()
865891
selectorThread.join()
866892
selector.close()

core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -63,34 +63,35 @@ extends DeserializationStream {
6363
def close() { objIn.close() }
6464
}
6565

66+
6667
private[spark] class JavaSerializerInstance(counterReset: Int, defaultClassLoader: ClassLoader)
6768
extends SerializerInstance {
6869

69-
def serialize[T: ClassTag](t: T): ByteBuffer = {
70+
override def serialize[T: ClassTag](t: T): ByteBuffer = {
7071
val bos = new ByteArrayOutputStream()
7172
val out = serializeStream(bos)
7273
out.writeObject(t)
7374
out.close()
7475
ByteBuffer.wrap(bos.toByteArray)
7576
}
7677

77-
def deserialize[T: ClassTag](bytes: ByteBuffer): T = {
78+
override def deserialize[T: ClassTag](bytes: ByteBuffer): T = {
7879
val bis = new ByteBufferInputStream(bytes)
7980
val in = deserializeStream(bis)
80-
in.readObject().asInstanceOf[T]
81+
in.readObject()
8182
}
8283

83-
def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T = {
84+
override def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T = {
8485
val bis = new ByteBufferInputStream(bytes)
8586
val in = deserializeStream(bis, loader)
86-
in.readObject().asInstanceOf[T]
87+
in.readObject()
8788
}
8889

89-
def serializeStream(s: OutputStream): SerializationStream = {
90+
override def serializeStream(s: OutputStream): SerializationStream = {
9091
new JavaSerializationStream(s, counterReset)
9192
}
9293

93-
def deserializeStream(s: InputStream): DeserializationStream = {
94+
override def deserializeStream(s: InputStream): DeserializationStream = {
9495
new JavaDeserializationStream(s, Utils.getContextOrSparkClassLoader)
9596
}
9697

core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ class KryoSerializer(conf: SparkConf)
9191
Thread.currentThread.setContextClassLoader(classLoader)
9292
reg.registerClasses(kryo)
9393
} catch {
94-
case e: Exception =>
94+
case e: Exception =>
9595
throw new SparkException(s"Failed to invoke $regCls", e)
9696
} finally {
9797
Thread.currentThread.setContextClassLoader(oldClassLoader)
@@ -106,7 +106,7 @@ class KryoSerializer(conf: SparkConf)
106106
kryo
107107
}
108108

109-
def newInstance(): SerializerInstance = {
109+
override def newInstance(): SerializerInstance = {
110110
new KryoSerializerInstance(this)
111111
}
112112
}
@@ -115,20 +115,20 @@ private[spark]
115115
class KryoSerializationStream(kryo: Kryo, outStream: OutputStream) extends SerializationStream {
116116
val output = new KryoOutput(outStream)
117117

118-
def writeObject[T: ClassTag](t: T): SerializationStream = {
118+
override def writeObject[T: ClassTag](t: T): SerializationStream = {
119119
kryo.writeClassAndObject(output, t)
120120
this
121121
}
122122

123-
def flush() { output.flush() }
124-
def close() { output.close() }
123+
override def flush() { output.flush() }
124+
override def close() { output.close() }
125125
}
126126

127127
private[spark]
128128
class KryoDeserializationStream(kryo: Kryo, inStream: InputStream) extends DeserializationStream {
129-
val input = new KryoInput(inStream)
129+
private val input = new KryoInput(inStream)
130130

131-
def readObject[T: ClassTag](): T = {
131+
override def readObject[T: ClassTag](): T = {
132132
try {
133133
kryo.readClassAndObject(input).asInstanceOf[T]
134134
} catch {
@@ -138,31 +138,31 @@ class KryoDeserializationStream(kryo: Kryo, inStream: InputStream) extends Deser
138138
}
139139
}
140140

141-
def close() {
141+
override def close() {
142142
// Kryo's Input automatically closes the input stream it is using.
143143
input.close()
144144
}
145145
}
146146

147147
private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends SerializerInstance {
148-
val kryo = ks.newKryo()
148+
private val kryo = ks.newKryo()
149149

150150
// Make these lazy vals to avoid creating a buffer unless we use them
151-
lazy val output = ks.newKryoOutput()
152-
lazy val input = new KryoInput()
151+
private lazy val output = ks.newKryoOutput()
152+
private lazy val input = new KryoInput()
153153

154-
def serialize[T: ClassTag](t: T): ByteBuffer = {
154+
override def serialize[T: ClassTag](t: T): ByteBuffer = {
155155
output.clear()
156156
kryo.writeClassAndObject(output, t)
157157
ByteBuffer.wrap(output.toBytes)
158158
}
159159

160-
def deserialize[T: ClassTag](bytes: ByteBuffer): T = {
160+
override def deserialize[T: ClassTag](bytes: ByteBuffer): T = {
161161
input.setBuffer(bytes.array)
162162
kryo.readClassAndObject(input).asInstanceOf[T]
163163
}
164164

165-
def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T = {
165+
override def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T = {
166166
val oldClassLoader = kryo.getClassLoader
167167
kryo.setClassLoader(loader)
168168
input.setBuffer(bytes.array)
@@ -171,11 +171,11 @@ private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends Serializ
171171
obj
172172
}
173173

174-
def serializeStream(s: OutputStream): SerializationStream = {
174+
override def serializeStream(s: OutputStream): SerializationStream = {
175175
new KryoSerializationStream(kryo, s)
176176
}
177177

178-
def deserializeStream(s: InputStream): DeserializationStream = {
178+
override def deserializeStream(s: InputStream): DeserializationStream = {
179179
new KryoDeserializationStream(kryo, s)
180180
}
181181
}

core/src/main/scala/org/apache/spark/serializer/Serializer.scala

Lines changed: 6 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ import org.apache.spark.util.{ByteBufferInputStream, NextIterator}
4343
* They are intended to be used to serialize/de-serialize data within a single Spark application.
4444
*/
4545
@DeveloperApi
46-
trait Serializer {
46+
abstract class Serializer {
4747

4848
/**
4949
* Default ClassLoader to use in deserialization. Implementations of [[Serializer]] should
@@ -61,10 +61,12 @@ trait Serializer {
6161
this
6262
}
6363

64+
/** Creates a new [[SerializerInstance]]. */
6465
def newInstance(): SerializerInstance
6566
}
6667

6768

69+
@DeveloperApi
6870
object Serializer {
6971
def getSerializer(serializer: Serializer): Serializer = {
7072
if (serializer == null) SparkEnv.get.serializer else serializer
@@ -81,7 +83,7 @@ object Serializer {
8183
* An instance of a serializer, for use by one thread at a time.
8284
*/
8385
@DeveloperApi
84-
trait SerializerInstance {
86+
abstract class SerializerInstance {
8587
def serialize[T: ClassTag](t: T): ByteBuffer
8688

8789
def deserialize[T: ClassTag](bytes: ByteBuffer): T
@@ -91,29 +93,14 @@ trait SerializerInstance {
9193
def serializeStream(s: OutputStream): SerializationStream
9294

9395
def deserializeStream(s: InputStream): DeserializationStream
94-
95-
def serializeMany[T: ClassTag](iterator: Iterator[T]): ByteBuffer = {
96-
// Default implementation uses serializeStream
97-
val stream = new ByteArrayOutputStream()
98-
serializeStream(stream).writeAll(iterator)
99-
val buffer = ByteBuffer.wrap(stream.toByteArray)
100-
buffer.flip()
101-
buffer
102-
}
103-
104-
def deserializeMany(buffer: ByteBuffer): Iterator[Any] = {
105-
// Default implementation uses deserializeStream
106-
buffer.rewind()
107-
deserializeStream(new ByteBufferInputStream(buffer)).asIterator
108-
}
10996
}
11097

11198
/**
11299
* :: DeveloperApi ::
113100
* A stream for writing serialized objects.
114101
*/
115102
@DeveloperApi
116-
trait SerializationStream {
103+
abstract class SerializationStream {
117104
def writeObject[T: ClassTag](t: T): SerializationStream
118105
def flush(): Unit
119106
def close(): Unit
@@ -132,7 +119,7 @@ trait SerializationStream {
132119
* A stream for reading serialized objects.
133120
*/
134121
@DeveloperApi
135-
trait DeserializationStream {
122+
abstract class DeserializationStream {
136123
def readObject[T: ClassTag](): T
137124
def close(): Unit
138125

0 commit comments

Comments
 (0)