Skip to content

Commit 19d5dd0

Browse files
committed
Merge github.com:apache/spark
2 parents f7f5bf0 + 0adc932 commit 19d5dd0

37 files changed

+88
-87
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -933,7 +933,6 @@ class SparkContext(config: SparkConf) extends Logging {
933933
def stop() {
934934
postApplicationEnd()
935935
ui.stop()
936-
eventLogger.foreach(_.stop())
937936
// Do this only if not stopped already - best case effort.
938937
// prevent NPE if stopped more than once.
939938
val dagSchedulerCopy = dagScheduler
@@ -942,13 +941,14 @@ class SparkContext(config: SparkConf) extends Logging {
942941
metadataCleaner.cancel()
943942
cleaner.foreach(_.stop())
944943
dagSchedulerCopy.stop()
945-
listenerBus.stop()
946944
taskScheduler = null
947945
// TODO: Cache.stop()?
948946
env.stop()
949947
SparkEnv.set(null)
950948
ShuffleMapTask.clearCache()
951949
ResultTask.clearCache()
950+
listenerBus.stop()
951+
eventLogger.foreach(_.stop())
952952
logInfo("Successfully stopped SparkContext")
953953
} else {
954954
logInfo("SparkContext already stopped")

core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,22 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
3636
private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
3737
private var queueFullErrorMessageLogged = false
3838
private var started = false
39+
private val listenerThread = new Thread("SparkListenerBus") {
40+
setDaemon(true)
41+
override def run() {
42+
while (true) {
43+
val event = eventQueue.take
44+
if (event == SparkListenerShutdown) {
45+
// Get out of the while loop and shutdown the daemon thread
46+
return
47+
}
48+
postToAll(event)
49+
}
50+
}
51+
}
52+
53+
// Exposed for testing
54+
@volatile private[spark] var stopCalled = false
3955

4056
/**
4157
* Start sending events to attached listeners.
@@ -48,20 +64,8 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
4864
if (started) {
4965
throw new IllegalStateException("Listener bus already started!")
5066
}
67+
listenerThread.start()
5168
started = true
52-
new Thread("SparkListenerBus") {
53-
setDaemon(true)
54-
override def run() {
55-
while (true) {
56-
val event = eventQueue.take
57-
if (event == SparkListenerShutdown) {
58-
// Get out of the while loop and shutdown the daemon thread
59-
return
60-
}
61-
postToAll(event)
62-
}
63-
}
64-
}.start()
6569
}
6670

6771
def post(event: SparkListenerEvent, blocking: Boolean = false) {
@@ -98,9 +102,11 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
98102
}
99103

100104
def stop() {
105+
stopCalled = true
101106
if (!started) {
102107
throw new IllegalStateException("Attempted to stop a listener bus that has not yet started!")
103108
}
104109
post(SparkListenerShutdown)
110+
listenerThread.join()
105111
}
106112
}

core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.scheduler
1919

20+
import java.util.concurrent.Semaphore
21+
2022
import scala.collection.mutable
2123

2224
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite}
@@ -72,6 +74,49 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
7274
}
7375
}
7476

77+
test("bus.stop() waits for the event queue to completely drain") {
78+
@volatile var drained = false
79+
80+
// Tells the listener to stop blocking
81+
val listenerWait = new Semaphore(1)
82+
83+
// When stop has returned
84+
val stopReturned = new Semaphore(1)
85+
86+
class BlockingListener extends SparkListener {
87+
override def onJobEnd(jobEnd: SparkListenerJobEnd) = {
88+
listenerWait.acquire()
89+
drained = true
90+
}
91+
}
92+
93+
val bus = new LiveListenerBus
94+
val blockingListener = new BlockingListener
95+
96+
bus.addListener(blockingListener)
97+
bus.start()
98+
bus.post(SparkListenerJobEnd(0, JobSucceeded))
99+
100+
// the queue should not drain immediately
101+
assert(!drained)
102+
103+
new Thread("ListenerBusStopper") {
104+
override def run() {
105+
// stop() will block until notify() is called below
106+
bus.stop()
107+
stopReturned.release(1)
108+
}
109+
}.start()
110+
111+
while (!bus.stopCalled) {
112+
Thread.sleep(10)
113+
}
114+
115+
listenerWait.release()
116+
stopReturned.acquire()
117+
assert(drained)
118+
}
119+
75120
test("basic creation of StageInfo") {
76121
val listener = new SaveStageAndTaskInfo
77122
sc.addSparkListener(listener)

examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,6 @@ object SparkHdfsLR {
7373
}
7474

7575
println("Final w: " + w)
76-
System.exit(0)
76+
sc.stop()
7777
}
7878
}

mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ import org.apache.spark.rdd.RDD
3030

3131
/**
3232
* :: DeveloperApi ::
33-
*
3433
* The Java stubs necessary for the Python mllib bindings.
3534
*/
3635
@DeveloperApi

mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ import org.apache.spark.rdd.RDD
2929

3030
/**
3131
* :: Experimental ::
32-
*
3332
* Model for Naive Bayes Classifiers.
3433
*
3534
* @param labels list of labels

mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,6 @@ class KMeans private (
7878

7979
/**
8080
* :: Experimental ::
81-
*
8281
* Set the number of runs of the algorithm to execute in parallel. We initialize the algorithm
8382
* this many times with random starting conditions (configured by the initialization mode), then
8483
* return the best clustering found over any run. Default: 1.
@@ -398,9 +397,6 @@ object KMeans {
398397
MLUtils.fastSquaredDistance(v1.vector, v1.norm, v2.vector, v2.norm)
399398
}
400399

401-
/**
402-
* :: Experimental ::
403-
*/
404400
@Experimental
405401
def main(args: Array[String]) {
406402
if (args.length < 4) {

mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrix.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ case class MatrixEntry(i: Long, j: Long, value: Double)
3434

3535
/**
3636
* :: Experimental ::
37-
*
3837
* Represents a matrix in coordinate format.
3938
*
4039
* @param entries matrix entries

mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,13 @@ import org.apache.spark.mllib.linalg.SingularValueDecomposition
2626

2727
/**
2828
* :: Experimental ::
29-
*
3029
* Represents a row of [[org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix]].
3130
*/
3231
@Experimental
3332
case class IndexedRow(index: Long, vector: Vector)
3433

3534
/**
3635
* :: Experimental ::
37-
*
3836
* Represents a row-oriented [[org.apache.spark.mllib.linalg.distributed.DistributedMatrix]] with
3937
* indexed rows.
4038
*

mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ import org.apache.spark.Logging
3030

3131
/**
3232
* :: Experimental ::
33-
*
3433
* Represents a row-oriented distributed Matrix with no meaningful row indices.
3534
*
3635
* @param rows rows stored as an RDD[Vector]

0 commit comments

Comments
 (0)