diff --git a/common/utils/src/main/scala/org/apache/spark/SparkException.scala b/common/utils/src/main/scala/org/apache/spark/SparkException.scala index eb908b826f77..4abf0fdf4984 100644 --- a/common/utils/src/main/scala/org/apache/spark/SparkException.scala +++ b/common/utils/src/main/scala/org/apache/spark/SparkException.scala @@ -67,8 +67,16 @@ class SparkException( object SparkException { def internalError(msg: String, context: Array[QueryContext], summary: String): SparkException = { + internalError(msg = msg, context = context, summary = summary, category = None) + } + + def internalError( + msg: String, + context: Array[QueryContext], + summary: String, + category: Option[String]): SparkException = { new SparkException( - errorClass = "INTERNAL_ERROR", + errorClass = "INTERNAL_ERROR" + category.map("_" + _).getOrElse(""), messageParameters = Map("message" -> msg), cause = null, context, @@ -76,7 +84,11 @@ object SparkException { } def internalError(msg: String): SparkException = { - internalError(msg, context = Array.empty[QueryContext], summary = "") + internalError(msg, context = Array.empty[QueryContext], summary = "", category = None) + } + + def internalError(msg: String, category: String): SparkException = { + internalError(msg, context = Array.empty[QueryContext], summary = "", category = Some(category)) } def internalError(msg: String, cause: Throwable): SparkException = { diff --git a/common/utils/src/main/scala/org/apache/spark/SparkThrowableHelper.scala b/common/utils/src/main/scala/org/apache/spark/SparkThrowableHelper.scala index 2ca5b81ffb96..a504c25456c7 100644 --- a/common/utils/src/main/scala/org/apache/spark/SparkThrowableHelper.scala +++ b/common/utils/src/main/scala/org/apache/spark/SparkThrowableHelper.scala @@ -62,7 +62,7 @@ private[spark] object SparkThrowableHelper { } def isInternalError(errorClass: String): Boolean = { - errorClass == "INTERNAL_ERROR" + errorClass.startsWith("INTERNAL_ERROR") } def getMessage(e: SparkThrowable with Throwable, format: ErrorMessageFormat.Value): String = { diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index 38502171a0a7..4719fb544f35 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -784,6 +784,12 @@ ], "sqlState" : "XX000" }, + "INTERNAL_ERROR_BROADCAST" : { + "message" : [ + "" + ], + "sqlState" : "XX000" + }, "INTERVAL_ARITHMETIC_OVERFLOW" : { "message" : [ "." diff --git a/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala index 9ef6c7c5906a..445b7d4d7aa0 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala @@ -140,8 +140,9 @@ abstract class Broadcast[T: ClassTag](val id: Long) extends Serializable with Lo /** Check if this broadcast is valid. If not valid, exception is thrown. */ protected def assertValid(): Unit = { if (!_isValid) { - throw new SparkException( - "Attempted to use %s after it was destroyed (%s) ".format(toString, _destroySite)) + throw SparkException.internalError( + "Attempted to use %s after it was destroyed (%s) ".format(toString, _destroySite), + category = "BROADCAST") } } diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index 7b4307668518..e211bd5749af 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -152,7 +152,8 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long, serializedO // Store a copy of the broadcast variable in the driver so that tasks run on the driver // do not create a duplicate copy of the broadcast variable's value. if (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, tellMaster = false)) { - throw new SparkException(s"Failed to store $broadcastId in BlockManager") + throw SparkException.internalError( + s"Failed to store $broadcastId in BlockManager", category = "BROADCAST") } } try { @@ -168,8 +169,8 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long, serializedO val pieceId = BroadcastBlockId(id, "piece" + i) val bytes = new ChunkedByteBuffer(block.duplicate()) if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true)) { - throw new SparkException(s"Failed to store $pieceId of $broadcastId " + - s"in local BlockManager") + throw SparkException.internalError(s"Failed to store $pieceId of $broadcastId " + + s"in local BlockManager", category = "BROADCAST") } } blocks.length @@ -204,19 +205,22 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long, serializedO if (checksumEnabled) { val sum = calcChecksum(b.chunks(0)) if (sum != checksums(pid)) { - throw new SparkException(s"corrupt remote block $pieceId of $broadcastId:" + - s" $sum != ${checksums(pid)}") + throw SparkException.internalError( + s"corrupt remote block $pieceId of $broadcastId: $sum != ${checksums(pid)}", + category = "BROADCAST") } } // We found the block from remote executors/driver's BlockManager, so put the block // in this executor's BlockManager. if (!bm.putBytes(pieceId, b, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)) { - throw new SparkException( - s"Failed to store $pieceId of $broadcastId in local BlockManager") + throw SparkException.internalError( + s"Failed to store $pieceId of $broadcastId in local BlockManager", + category = "BROADCAST") } blocks(pid) = new ByteBufferBlockData(b, true) case None => - throw new SparkException(s"Failed to get $pieceId of $broadcastId") + throw SparkException.internalError( + s"Failed to get $pieceId of $broadcastId", category = "BROADCAST") } } } @@ -265,7 +269,9 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long, serializedO x } else { - throw new SparkException(s"Failed to get locally stored broadcast data: $broadcastId") + throw SparkException.internalError( + s"Failed to get locally stored broadcast data: $broadcastId", + category = "BROADCAST") } case None => val estimatedTotalSize = Utils.bytesToString(numBlocks.toLong * blockSize) @@ -284,7 +290,8 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long, serializedO // need to re-fetch it. val storageLevel = StorageLevel.MEMORY_AND_DISK if (!blockManager.putSingle(broadcastId, obj, storageLevel, tellMaster = false)) { - throw new SparkException(s"Failed to store $broadcastId in BlockManager") + throw SparkException.internalError( + s"Failed to store $broadcastId in BlockManager", category = "BROADCAST") } } diff --git a/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala b/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala index a8c56cf14606..f5b5ad2ab103 100644 --- a/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala @@ -130,6 +130,7 @@ class SparkThrowableSuite extends SparkFunSuite { test("Message format invariants") { val messageFormats = errorReader.errorInfoMap .filterKeys(!_.startsWith("_LEGACY_ERROR_TEMP_")) + .filterKeys(!_.startsWith("INTERNAL_ERROR")) .values.toSeq.flatMap { i => Seq(i.messageTemplate) } checkCondition(messageFormats, s => s != null) checkIfUnique(messageFormats) diff --git a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala index 41452076f888..1efef3383b82 100644 --- a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala +++ b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala @@ -315,9 +315,15 @@ class BroadcastSuite extends SparkFunSuite with LocalSparkContext with Encryptio if (removeFromDriver) { // Using this variable on the executors crashes them, which hangs the test. // Instead, crash the driver by directly accessing the broadcast value. - intercept[SparkException] { broadcast.value } - intercept[SparkException] { broadcast.unpersist(blocking = true) } - intercept[SparkException] { broadcast.destroy(blocking = true) } + val e1 = intercept[SparkException] { broadcast.value } + assert(e1.isInternalError) + assert(e1.getErrorClass == "INTERNAL_ERROR_BROADCAST") + val e2 = intercept[SparkException] { broadcast.unpersist(blocking = true) } + assert(e2.isInternalError) + assert(e2.getErrorClass == "INTERNAL_ERROR_BROADCAST") + val e3 = intercept[SparkException] { broadcast.destroy(blocking = true) } + assert(e3.isInternalError) + assert(e3.getErrorClass == "INTERNAL_ERROR_BROADCAST") } else { val results = sc.parallelize(1 to partitions, partitions).map(x => (x, broadcast.value.sum)) assert(results.collect().toSet === (1 to partitions).map(x => (x, list.sum)).toSet) @@ -332,6 +338,8 @@ package object testPackage extends Assertions { broadcast.destroy(blocking = true) val thrown = intercept[SparkException] { broadcast.value } assert(thrown.getMessage.contains("BroadcastSuite.scala")) + assert(thrown.isInternalError) + assert(thrown.getErrorClass == "INTERNAL_ERROR_BROADCAST") } }