Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions common/utils/src/main/scala/org/apache/spark/SparkException.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,28 @@ class SparkException(

object SparkException {
def internalError(msg: String, context: Array[QueryContext], summary: String): SparkException = {
internalError(msg = msg, context = context, summary = summary, category = None)
}

def internalError(
msg: String,
context: Array[QueryContext],
summary: String,
category: Option[String]): SparkException = {
new SparkException(
errorClass = "INTERNAL_ERROR",
errorClass = "INTERNAL_ERROR" + category.map("_" + _).getOrElse(""),
messageParameters = Map("message" -> msg),
cause = null,
context,
summary)
}

def internalError(msg: String): SparkException = {
internalError(msg, context = Array.empty[QueryContext], summary = "")
internalError(msg, context = Array.empty[QueryContext], summary = "", category = None)
}

def internalError(msg: String, category: String): SparkException = {
internalError(msg, context = Array.empty[QueryContext], summary = "", category = Some(category))
}

def internalError(msg: String, cause: Throwable): SparkException = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ private[spark] object SparkThrowableHelper {
}

def isInternalError(errorClass: String): Boolean = {
errorClass == "INTERNAL_ERROR"
errorClass.startsWith("INTERNAL_ERROR")
}

def getMessage(e: SparkThrowable with Throwable, format: ErrorMessageFormat.Value): String = {
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,12 @@
],
"sqlState" : "XX000"
},
"INTERNAL_ERROR_BROADCAST" : {
"message" : [
"<message>"
],
"sqlState" : "XX000"
},
"INTERVAL_ARITHMETIC_OVERFLOW" : {
"message" : [
"<message>.<alternative>"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,9 @@ abstract class Broadcast[T: ClassTag](val id: Long) extends Serializable with Lo
/** Check if this broadcast is valid. If not valid, exception is thrown. */
protected def assertValid(): Unit = {
if (!_isValid) {
throw new SparkException(
"Attempted to use %s after it was destroyed (%s) ".format(toString, _destroySite))
throw SparkException.internalError(
"Attempted to use %s after it was destroyed (%s) ".format(toString, _destroySite),
category = "BROADCAST")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long, serializedO
// Store a copy of the broadcast variable in the driver so that tasks run on the driver
// do not create a duplicate copy of the broadcast variable's value.
if (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, tellMaster = false)) {
throw new SparkException(s"Failed to store $broadcastId in BlockManager")
throw SparkException.internalError(
s"Failed to store $broadcastId in BlockManager", category = "BROADCAST")
}
}
try {
Expand All @@ -168,8 +169,8 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long, serializedO
val pieceId = BroadcastBlockId(id, "piece" + i)
val bytes = new ChunkedByteBuffer(block.duplicate())
if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true)) {
throw new SparkException(s"Failed to store $pieceId of $broadcastId " +
s"in local BlockManager")
throw SparkException.internalError(s"Failed to store $pieceId of $broadcastId " +
s"in local BlockManager", category = "BROADCAST")
}
}
blocks.length
Expand Down Expand Up @@ -204,19 +205,22 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long, serializedO
if (checksumEnabled) {
val sum = calcChecksum(b.chunks(0))
if (sum != checksums(pid)) {
throw new SparkException(s"corrupt remote block $pieceId of $broadcastId:" +
s" $sum != ${checksums(pid)}")
throw SparkException.internalError(
s"corrupt remote block $pieceId of $broadcastId: $sum != ${checksums(pid)}",
category = "BROADCAST")
}
}
// We found the block from remote executors/driver's BlockManager, so put the block
// in this executor's BlockManager.
if (!bm.putBytes(pieceId, b, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)) {
throw new SparkException(
s"Failed to store $pieceId of $broadcastId in local BlockManager")
throw SparkException.internalError(
s"Failed to store $pieceId of $broadcastId in local BlockManager",
category = "BROADCAST")
}
blocks(pid) = new ByteBufferBlockData(b, true)
case None =>
throw new SparkException(s"Failed to get $pieceId of $broadcastId")
throw SparkException.internalError(
s"Failed to get $pieceId of $broadcastId", category = "BROADCAST")
}
}
}
Expand Down Expand Up @@ -265,7 +269,9 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long, serializedO

x
} else {
throw new SparkException(s"Failed to get locally stored broadcast data: $broadcastId")
throw SparkException.internalError(
s"Failed to get locally stored broadcast data: $broadcastId",
category = "BROADCAST")
}
case None =>
val estimatedTotalSize = Utils.bytesToString(numBlocks.toLong * blockSize)
Expand All @@ -284,7 +290,8 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long, serializedO
// need to re-fetch it.
val storageLevel = StorageLevel.MEMORY_AND_DISK
if (!blockManager.putSingle(broadcastId, obj, storageLevel, tellMaster = false)) {
throw new SparkException(s"Failed to store $broadcastId in BlockManager")
throw SparkException.internalError(
s"Failed to store $broadcastId in BlockManager", category = "BROADCAST")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ class SparkThrowableSuite extends SparkFunSuite {
test("Message format invariants") {
val messageFormats = errorReader.errorInfoMap
.filterKeys(!_.startsWith("_LEGACY_ERROR_TEMP_"))
.filterKeys(!_.startsWith("INTERNAL_ERROR"))
.values.toSeq.flatMap { i => Seq(i.messageTemplate) }
checkCondition(messageFormats, s => s != null)
checkIfUnique(messageFormats)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,9 +315,15 @@ class BroadcastSuite extends SparkFunSuite with LocalSparkContext with Encryptio
if (removeFromDriver) {
// Using this variable on the executors crashes them, which hangs the test.
// Instead, crash the driver by directly accessing the broadcast value.
intercept[SparkException] { broadcast.value }
intercept[SparkException] { broadcast.unpersist(blocking = true) }
intercept[SparkException] { broadcast.destroy(blocking = true) }
val e1 = intercept[SparkException] { broadcast.value }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use the checkError util for testing?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm, we don't need to check error message here.

assert(e1.isInternalError)
assert(e1.getErrorClass == "INTERNAL_ERROR_BROADCAST")
val e2 = intercept[SparkException] { broadcast.unpersist(blocking = true) }
assert(e2.isInternalError)
assert(e2.getErrorClass == "INTERNAL_ERROR_BROADCAST")
val e3 = intercept[SparkException] { broadcast.destroy(blocking = true) }
assert(e3.isInternalError)
assert(e3.getErrorClass == "INTERNAL_ERROR_BROADCAST")
} else {
val results = sc.parallelize(1 to partitions, partitions).map(x => (x, broadcast.value.sum))
assert(results.collect().toSet === (1 to partitions).map(x => (x, list.sum)).toSet)
Expand All @@ -332,6 +338,8 @@ package object testPackage extends Assertions {
broadcast.destroy(blocking = true)
val thrown = intercept[SparkException] { broadcast.value }
assert(thrown.getMessage.contains("BroadcastSuite.scala"))
assert(thrown.isInternalError)
assert(thrown.getErrorClass == "INTERNAL_ERROR_BROADCAST")
}

}