Skip to content

Commit dd681f5

Browse files
Mridul Muralidharanaarondav
authored andcommitted
SPARK-1587 Fix thread leak
mvn test fails (intermittently) due to thread leak - since scalatest runs all tests in same vm. Author: Mridul Muralidharan <[email protected]> Closes #504 from mridulm/resource_leak_fixes and squashes the following commits: a5d10d0 [Mridul Muralidharan] Prevent thread leaks while running tests : cleanup all threads when SparkContext.stop is invoked. Causes tests to fail 7b5e19c [Mridul Muralidharan] Prevent NPE while running tests
1 parent bb68f47 commit dd681f5

File tree

7 files changed

+42
-21
lines changed

7 files changed

+42
-21
lines changed

core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -129,17 +129,19 @@ private[spark] class MetricsSystem private (val instance: String,
129129

130130
sinkConfigs.foreach { kv =>
131131
val classPath = kv._2.getProperty("class")
132-
try {
133-
val sink = Class.forName(classPath)
134-
.getConstructor(classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager])
135-
.newInstance(kv._2, registry, securityMgr)
136-
if (kv._1 == "servlet") {
137-
metricsServlet = Some(sink.asInstanceOf[MetricsServlet])
138-
} else {
139-
sinks += sink.asInstanceOf[Sink]
132+
if (null != classPath) {
133+
try {
134+
val sink = Class.forName(classPath)
135+
.getConstructor(classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager])
136+
.newInstance(kv._2, registry, securityMgr)
137+
if (kv._1 == "servlet") {
138+
metricsServlet = Some(sink.asInstanceOf[MetricsServlet])
139+
} else {
140+
sinks += sink.asInstanceOf[Sink]
141+
}
142+
} catch {
143+
case e: Exception => logError("Sink class " + classPath + " cannot be instantialized", e)
140144
}
141-
} catch {
142-
case e: Exception => logError("Sink class " + classPath + " cannot be instantialized", e)
143145
}
144146
}
145147
}

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,7 @@ private[spark] class TaskSchedulerImpl(
356356
if (taskResultGetter != null) {
357357
taskResultGetter.stop()
358358
}
359+
starvationTimer.cancel()
359360

360361
// sleeping for an arbitrary 1 seconds to ensure that messages are sent out.
361362
Thread.sleep(1000L)

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1021,6 +1021,8 @@ private[spark] class BlockManager(
10211021
heartBeatTask.cancel()
10221022
}
10231023
connectionManager.stop()
1024+
shuffleBlockManager.stop()
1025+
diskBlockManager.stop()
10241026
actorSystem.stop(slaveActor)
10251027
blockInfo.clear()
10261028
memoryStore.clear()

core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -150,20 +150,26 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD
150150
Runtime.getRuntime.addShutdownHook(new Thread("delete Spark local dirs") {
151151
override def run() {
152152
logDebug("Shutdown hook called")
153-
localDirs.foreach { localDir =>
154-
try {
155-
if (!Utils.hasRootAsShutdownDeleteDir(localDir)) Utils.deleteRecursively(localDir)
156-
} catch {
157-
case t: Throwable =>
158-
logError("Exception while deleting local spark dir: " + localDir, t)
159-
}
160-
}
153+
stop()
154+
}
155+
})
156+
}
161157

162-
if (shuffleSender != null) {
163-
shuffleSender.stop()
158+
private[spark] def stop() {
159+
localDirs.foreach { localDir =>
160+
if (localDir.isDirectory() && localDir.exists()) {
161+
try {
162+
if (!Utils.hasRootAsShutdownDeleteDir(localDir)) Utils.deleteRecursively(localDir)
163+
} catch {
164+
case t: Throwable =>
165+
logError("Exception while deleting local spark dir: " + localDir, t)
164166
}
165167
}
166-
})
168+
}
169+
170+
if (shuffleSender != null) {
171+
shuffleSender.stop()
172+
}
167173
}
168174

169175
private[storage] def startShuffleBlockSender(port: Int): Int = {

core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,10 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
207207
private def cleanup(cleanupTime: Long) {
208208
shuffleStates.clearOldValues(cleanupTime, (shuffleId, state) => removeShuffleBlocks(shuffleId))
209209
}
210+
211+
def stop() {
212+
metadataCleaner.cancel()
213+
}
210214
}
211215

212216
private[spark]

core/src/main/scala/org/apache/spark/ui/JettyUtils.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,7 @@ private[spark] object JettyUtils extends Logging {
195195
(server, server.getConnectors.head.getLocalPort)
196196
case f: Failure[_] =>
197197
server.stop()
198+
pool.stop()
198199
logInfo("Failed to create UI at port, %s. Trying again.".format(currentPort))
199200
logInfo("Error was: " + f.toString)
200201
connect((currentPort + 1) % 65536)

core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,11 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach {
5353
shuffleBlockManager.idToSegmentMap.clear()
5454
}
5555

56+
override def afterEach() {
57+
diskBlockManager.stop()
58+
shuffleBlockManager.idToSegmentMap.clear()
59+
}
60+
5661
test("basic block creation") {
5762
val blockId = new TestBlockId("test")
5863
assertSegmentEquals(blockId, blockId.name, 0, 0)

0 commit comments

Comments
 (0)