Skip to content

Commit 7a89250

Browse files
committed
Merge pull request alteryx#20 from markhamstra/SPY-302
SPY-302 pick up dropped backports
2 parents d317446 + 2ebad24 commit 7a89250

File tree

12 files changed

+170
-64
lines changed

12 files changed

+170
-64
lines changed

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,6 @@ private[spark] object PythonRDD {
218218
}
219219
} catch {
220220
case eof: EOFException => {}
221-
case e: Throwable => throw e
222221
}
223222
JavaRDD.fromRDD(sc.sc.parallelize(objs, parallelism))
224223
}

core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,13 +58,11 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
5858
try {
5959
new Socket(daemonHost, daemonPort)
6060
} catch {
61-
case exc: SocketException => {
61+
case exc: SocketException =>
6262
logWarning("Python daemon unexpectedly quit, attempting to restart")
6363
stopDaemon()
6464
startDaemon()
6565
new Socket(daemonHost, daemonPort)
66-
}
67-
case e: Throwable => throw e
6866
}
6967
}
7068
}

core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ object SparkHadoopUtil {
8989
.newInstance()
9090
.asInstanceOf[SparkHadoopUtil]
9191
} catch {
92-
case th: Throwable => throw new SparkException("Unable to load YARN support", th)
92+
case e: Exception => throw new SparkException("Unable to load YARN support", e)
9393
}
9494
} else {
9595
new SparkHadoopUtil

core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
3131
import org.apache.spark.deploy.DeployMessages._
3232
import org.apache.spark.deploy.master.Master
3333
import org.apache.spark.util.AkkaUtils
34+
import org.apache.spark.util.Utils
35+
3436

3537
/**
3638
* Interface allowing applications to speak with a Spark deploy cluster. Takes a master URL,
@@ -88,14 +90,16 @@ private[spark] class AppClient(
8890
var retries = 0
8991
registrationRetryTimer = Some {
9092
context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) {
91-
retries += 1
92-
if (registered) {
93-
registrationRetryTimer.foreach(_.cancel())
94-
} else if (retries >= REGISTRATION_RETRIES) {
95-
logError("All masters are unresponsive! Giving up.")
96-
markDead()
97-
} else {
98-
tryRegisterAllMasters()
93+
Utils.tryOrExit {
94+
retries += 1
95+
if (registered) {
96+
registrationRetryTimer.foreach(_.cancel())
97+
} else if (retries >= REGISTRATION_RETRIES) {
98+
logError("All masters are unresponsive! Giving up.")
99+
markDead()
100+
} else {
101+
tryRegisterAllMasters()
102+
}
99103
}
100104
}
101105
}

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -163,14 +163,16 @@ private[spark] class Worker(
163163
var retries = 0
164164
registrationRetryTimer = Some {
165165
context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) {
166-
retries += 1
167-
if (registered) {
168-
registrationRetryTimer.foreach(_.cancel())
169-
} else if (retries >= REGISTRATION_RETRIES) {
170-
logError("All masters are unresponsive! Giving up.")
171-
System.exit(1)
172-
} else {
173-
tryRegisterAllMasters()
166+
Utils.tryOrExit {
167+
retries += 1
168+
if (registered) {
169+
registrationRetryTimer.foreach(_.cancel())
170+
} else if (retries >= REGISTRATION_RETRIES) {
171+
logError("All masters are unresponsive! Giving up.")
172+
System.exit(1)
173+
} else {
174+
tryRegisterAllMasters()
175+
}
174176
}
175177
}
176178
}

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 11 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -78,28 +78,7 @@ private[spark] class Executor(
7878
// Setup an uncaught exception handler for non-local mode.
7979
// Make any thread terminations due to uncaught exceptions kill the entire
8080
// executor process to avoid surprising stalls.
81-
Thread.setDefaultUncaughtExceptionHandler(
82-
new Thread.UncaughtExceptionHandler {
83-
override def uncaughtException(thread: Thread, exception: Throwable) {
84-
try {
85-
logError("Uncaught exception in thread " + thread, exception)
86-
87-
// We may have been called from a shutdown hook. If so, we must not call System.exit().
88-
// (If we do, we will deadlock.)
89-
if (!Utils.inShutdown()) {
90-
if (exception.isInstanceOf[OutOfMemoryError]) {
91-
System.exit(ExecutorExitCode.OOM)
92-
} else {
93-
System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION)
94-
}
95-
}
96-
} catch {
97-
case oom: OutOfMemoryError => Runtime.getRuntime.halt(ExecutorExitCode.OOM)
98-
case t: Throwable => Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE)
99-
}
100-
}
101-
}
102-
)
81+
Thread.setDefaultUncaughtExceptionHandler(ExecutorUncaughtExceptionHandler)
10382
}
10483

10584
val executorSource = new ExecutorSource(this, executorId)
@@ -257,6 +236,11 @@ private[spark] class Executor(
257236
}
258237

259238
case t: Throwable => {
239+
// Attempt to exit cleanly by informing the driver of our failure.
240+
// If anything goes wrong (or this was a fatal exception), we will delegate to
241+
// the default uncaught exception handler, which will terminate the Executor.
242+
logError("Exception in task ID " + taskId, t)
243+
260244
val serviceTime = (System.currentTimeMillis() - taskStart).toInt
261245
val metrics = attemptedTask.flatMap(t => t.metrics)
262246
for (m <- metrics) {
@@ -266,11 +250,11 @@ private[spark] class Executor(
266250
val reason = ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace, metrics)
267251
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
268252

269-
// TODO: Should we exit the whole executor here? On the one hand, the failed task may
270-
// have left some weird state around depending on when the exception was thrown, but on
271-
// the other hand, maybe we could detect that when future tasks fail and exit then.
272-
logError("Exception in task ID " + taskId, t)
273-
//System.exit(1)
253+
// Don't forcibly exit unless the exception was inherently fatal, to avoid
254+
// stopping other tasks unnecessarily.
255+
if (Utils.isFatalError(t)) {
256+
ExecutorUncaughtExceptionHandler.uncaughtException(t)
257+
}
274258
}
275259
} finally {
276260
// TODO: Unregister shuffle memory only for ResultTask
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.executor
19+
20+
import org.apache.spark.Logging
21+
import org.apache.spark.util.Utils
22+
23+
/**
24+
* The default uncaught exception handler for Executors terminates the whole process, to avoid
25+
* getting into a bad state indefinitely. Since Executors are relatively lightweight, it's better
26+
* to fail fast when things go wrong.
27+
*/
28+
private[spark] object ExecutorUncaughtExceptionHandler
29+
extends Thread.UncaughtExceptionHandler with Logging {
30+
31+
override def uncaughtException(thread: Thread, exception: Throwable) {
32+
try {
33+
logError("Uncaught exception in thread " + thread, exception)
34+
35+
// We may have been called from a shutdown hook. If so, we must not call System.exit().
36+
// (If we do, we will deadlock.)
37+
if (!Utils.inShutdown()) {
38+
if (exception.isInstanceOf[OutOfMemoryError]) {
39+
System.exit(ExecutorExitCode.OOM)
40+
} else {
41+
System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION)
42+
}
43+
}
44+
} catch {
45+
case oom: OutOfMemoryError => Runtime.getRuntime.halt(ExecutorExitCode.OOM)
46+
case t: Throwable => Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE)
47+
}
48+
}
49+
50+
def uncaughtException(exception: Throwable) {
51+
uncaughtException(Thread.currentThread(), exception)
52+
}
53+
}

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ private[spark] class BlockManager(
133133
BlockManagerWorker.startBlockManagerWorker(this)
134134
if (!BlockManager.getDisableHeartBeatsForTesting(conf)) {
135135
heartBeatTask = actorSystem.scheduler.schedule(0.seconds, heartBeatFrequency.milliseconds) {
136-
heartBeat()
136+
Utils.tryOrExit { heartBeat() }
137137
}
138138
}
139139
}
@@ -842,8 +842,26 @@ private[spark] class BlockManager(
842842
bytes: ByteBuffer,
843843
serializer: Serializer = defaultSerializer): Iterator[Any] = {
844844
bytes.rewind()
845-
val stream = wrapForCompression(blockId, new ByteBufferInputStream(bytes, true))
846-
serializer.newInstance().deserializeStream(stream).asIterator
845+
846+
def getIterator = {
847+
val stream = wrapForCompression(blockId, new ByteBufferInputStream(bytes, true))
848+
serializer.newInstance().deserializeStream(stream).asIterator
849+
}
850+
851+
if (blockId.isShuffle) {
852+
// Reducer may need to read many local shuffle blocks and will wrap them into Iterators
853+
// at the beginning. The wrapping will cost some memory (compression instance
854+
// initialization, etc.). Reducer read shuffle blocks one by one so we could do the
855+
// wrapping lazily to save memory.
856+
class LazyProxyIterator(f: => Iterator[Any]) extends Iterator[Any] {
857+
lazy val proxy = f
858+
override def hasNext: Boolean = proxy.hasNext
859+
override def next(): Any = proxy.next()
860+
}
861+
new LazyProxyIterator(getIterator)
862+
} else {
863+
getIterator
864+
}
847865
}
848866

849867
def stop() {

core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -134,14 +134,14 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD
134134
private def addShutdownHook() {
135135
localDirs.foreach(localDir => Utils.registerShutdownDeleteDir(localDir))
136136
Runtime.getRuntime.addShutdownHook(new Thread("delete Spark local dirs") {
137-
override def run() {
137+
override def run(): Unit = Utils.logUncaughtExceptions {
138138
logDebug("Shutdown hook called")
139139
localDirs.foreach { localDir =>
140140
try {
141141
if (!Utils.hasRootAsShutdownDeleteDir(localDir)) Utils.deleteRecursively(localDir)
142142
} catch {
143-
case t: Throwable =>
144-
logError("Exception while deleting local spark dir: " + localDir, t)
143+
case e: Exception =>
144+
logError("Exception while deleting local spark dir: " + localDir, e)
145145
}
146146
}
147147

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 42 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.util
1919

2020
import java.io._
2121
import java.net.{InetAddress, URL, URI, NetworkInterface, Inet4Address}
22+
import java.nio.ByteBuffer
2223
import java.util.{Locale, Random, UUID}
2324
import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadPoolExecutor}
2425

@@ -27,19 +28,16 @@ import scala.collection.Map
2728
import scala.collection.mutable.ArrayBuffer
2829
import scala.io.Source
2930
import scala.reflect.ClassTag
31+
import scala.util.control.{ControlThrowable, NonFatal}
3032

3133
import com.google.common.io.Files
3234
import com.google.common.util.concurrent.ThreadFactoryBuilder
3335

34-
import org.apache.hadoop.conf.Configuration
35-
import org.apache.hadoop.fs.{Path, FileSystem, FileUtil}
36-
import org.apache.hadoop.io._
37-
38-
import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
36+
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
37+
import org.apache.spark.{Logging, SparkConf, SparkException}
3938
import org.apache.spark.deploy.SparkHadoopUtil
40-
import java.nio.ByteBuffer
41-
import org.apache.spark.{SparkConf, SparkException, Logging}
42-
39+
import org.apache.spark.executor.ExecutorUncaughtExceptionHandler
40+
import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
4341

4442
/**
4543
* Various utility methods used by Spark.
@@ -621,6 +619,18 @@ private[spark] object Utils extends Logging {
621619
output.toString
622620
}
623621

622+
/**
623+
* Execute a block of code that evaluates to Unit, forwarding any uncaught exceptions to the
624+
* default UncaughtExceptionHandler
625+
*/
626+
def tryOrExit(block: => Unit) {
627+
try {
628+
block
629+
} catch {
630+
case t: Throwable => ExecutorUncaughtExceptionHandler.uncaughtException(t)
631+
}
632+
}
633+
624634
/**
625635
* A regular expression to match classes of the "core" Spark API that we want to skip when
626636
* finding the call site of a method.
@@ -833,4 +843,28 @@ private[spark] object Utils extends Logging {
833843
System.currentTimeMillis - start
834844
}
835845

846+
/**
847+
* Executes the given block, printing and re-throwing any uncaught exceptions.
848+
* This is particularly useful for wrapping code that runs in a thread, to ensure
849+
* that exceptions are printed, and to avoid having to catch Throwable.
850+
*/
851+
def logUncaughtExceptions[T](f: => T): T = {
852+
try {
853+
f
854+
} catch {
855+
case t: Throwable =>
856+
logError(s"Uncaught exception in thread ${Thread.currentThread().getName}", t)
857+
throw t
858+
}
859+
}
860+
861+
/** Returns true if the given exception was fatal. See docs for scala.util.control.NonFatal. */
862+
def isFatalError(e: Throwable): Boolean = {
863+
e match {
864+
case NonFatal(_) | _: InterruptedException | _: NotImplementedError | _: ControlThrowable =>
865+
false
866+
case _ =>
867+
true
868+
}
869+
}
836870
}

0 commit comments

Comments
 (0)