Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,14 @@ class StreamingQueryException private[sql](
cause: Throwable,
errorClass: String,
messageParameters: Map[String, String]) = {
this("", message, cause, null, null, errorClass, messageParameters)
this(
messageParameters.get("queryDebugString").getOrElse(""),
message,
cause,
messageParameters.get("startOffset").getOrElse(""),
messageParameters.get("endOffset").getOrElse(""),
errorClass,
messageParameters)
}

def this(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,29 +109,34 @@ trait StreamingQuery {
def lastProgress: StreamingQueryProgress

/**
* Waits for the termination of `this` query, either by `query.stop()` or by an exception.
* Waits for the termination of `this` query, either by `query.stop()` or by an exception. If
* the query has terminated with an exception, then the exception will be thrown.
*
* If the query has terminated, then all subsequent calls to this method will either return
* immediately (if the query was terminated by `stop()`).
* immediately (if the query was terminated by `stop()`), or throw the exception immediately (if
* the query has terminated with exception).
*
* @throws StreamingQueryException
* if the query has terminated with an exception.
* @since 3.5.0
*/
// TODO(SPARK-43299): verity the behavior of this method after JVM client-side error-handling
// framework is supported and modify the doc accordingly.
@throws[StreamingQueryException]
def awaitTermination(): Unit

/**
* Waits for the termination of `this` query, either by `query.stop()` or by an exception. If
* the query has terminated with an exception, then the exception will be thrown. Otherwise, it
* returns whether the query has terminated or not within the `timeoutMs` milliseconds.
*
* If the query has terminated, then all subsequent calls to this method will return `true`
* immediately.
* If the query has terminated, then all subsequent calls to this method will either return
* `true` immediately (if the query was terminated by `stop()`), or throw the exception
* immediately (if the query has terminated with exception).
*
* @throws StreamingQueryException
* if the query has terminated with an exception
* @since 3.5.0
*/
// TODO(SPARK-43299): verity the behavior of this method after JVM client-side error-handling
// framework is supported and modify the doc accordingly.
@throws[StreamingQueryException]
def awaitTermination(timeoutMs: Long): Boolean

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,17 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
* immediately (if the query was terminated with exception). Use `resetTerminated()` to clear
* past terminations and wait for new terminations.
*
* For correctly documenting exceptions across multiple queries, users need to stop all of them
* after any of them terminates with exception, and then check the `query.exception()` for each
* query.
* In the case where multiple queries have terminated since `resetTermination()` was called, if
* any query has terminated with exception, then `awaitAnyTermination()` will throw any of the
* exception. For correctly documenting exceptions across multiple queries, users need to stop
* all of them after any of them terminates with exception, and then check the
* `query.exception()` for each query.
*
* @throws StreamingQueryException
* if any query has terminated with an exception
* @since 3.5.0
*/
// TODO(SPARK-43299): verity the behavior of this method after JVM client-side error-handling
// framework is supported and modify the doc accordingly.
@throws[StreamingQueryException]
def awaitAnyTermination(): Unit = {
executeManagerCmd(_.getAwaitAnyTerminationBuilder.build())
}
Expand All @@ -115,14 +118,17 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
* exception immediately (if the query was terminated with exception). Use `resetTerminated()`
* to clear past terminations and wait for new terminations.
*
* For correctly documenting exceptions across multiple queries, users need to stop all of them
* after any of them terminates with exception, and then check the `query.exception()` for each
* query.
* In the case where multiple queries have terminated since `resetTermination()` was called, if
* any query has terminated with exception, then `awaitAnyTermination()` will throw any of the
* exception. For correctly documenting exceptions across multiple queries, users need to stop
* all of them after any of them terminates with exception, and then check the
* `query.exception()` for each query.
*
* @throws StreamingQueryException
* if any query has terminated with an exception
* @since 3.5.0
*/
// TODO(SPARK-43299): verity the behavior of this method after JVM client-side error-handling
// framework is supported and modify the doc accordingly.
@throws[StreamingQueryException]
def awaitAnyTermination(timeoutMs: Long): Boolean = {
require(timeoutMs > 0, "Timeout has to be positive")
executeManagerCmd(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,7 @@ class ClientStreamingQuerySuite extends QueryTest with SQLHelper with Logging {
}

test("throw exception in streaming") {
// Disable spark.sql.pyspark.jvmStacktrace.enabled to avoid hitting the
// netty header limit.
withSQLConf("spark.sql.pyspark.jvmStacktrace.enabled" -> "false") {
try {
val session = spark
import session.implicits._

Expand All @@ -204,14 +202,57 @@ class ClientStreamingQuerySuite extends QueryTest with SQLHelper with Logging {
}

assert(exception.getErrorClass != null)
assert(!exception.getMessageParameters.isEmpty)
assert(exception.getMessageParameters().get("id") == query.id.toString)
assert(exception.getMessageParameters().get("runId") == query.runId.toString)
assert(!exception.getMessageParameters().get("startOffset").isEmpty)
assert(!exception.getMessageParameters().get("endOffset").isEmpty)
assert(exception.getCause.isInstanceOf[SparkException])
assert(exception.getCause.getCause.isInstanceOf[SparkException])
assert(exception.getCause.getCause.getCause.isInstanceOf[SparkException])
assert(
exception.getCause.getCause.getCause.getMessage
.contains("java.lang.RuntimeException: Number 2 encountered!"))
} finally {
spark.streams.resetTerminated()
}
}

test("throw exception in streaming, check with StreamingQueryManager") {
val session = spark
import session.implicits._

val checkForTwo = udf((value: Int) => {
if (value == 2) {
throw new RuntimeException("Number 2 encountered!")
}
value
})

val query = spark.readStream
.format("rate")
.option("rowsPerSecond", "1")
.load()
.select(checkForTwo($"value").as("checkedValue"))
.writeStream
.outputMode("append")
.format("console")
.start()

val exception = intercept[StreamingQueryException] {
spark.streams.awaitAnyTermination()
}

assert(exception.getErrorClass != null)
assert(exception.getMessageParameters().get("id") == query.id.toString)
assert(exception.getMessageParameters().get("runId") == query.runId.toString)
assert(!exception.getMessageParameters().get("startOffset").isEmpty)
assert(!exception.getMessageParameters().get("endOffset").isEmpty)
assert(exception.getCause.isInstanceOf[SparkException])
assert(exception.getCause.getCause.isInstanceOf[SparkException])
assert(exception.getCause.getCause.getCause.isInstanceOf[SparkException])
assert(
exception.getCause.getCause.getCause.getMessage
.contains("java.lang.RuntimeException: Number 2 encountered!"))
}

test("foreach Row") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,11 @@ abstract class StreamExecution(
messageParameters = Map(
"id" -> id.toString,
"runId" -> runId.toString,
"message" -> message))
"message" -> message,
"queryDebugString" -> toDebugString(includeLogicalPlan = isInitialized),
"startOffset" -> committedOffsets.toOffsetSeq(sources, offsetSeqMetadata).toString,
"endOffset" -> availableOffsets.toOffsetSeq(sources, offsetSeqMetadata).toString
Comment on lines +333 to +335
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When you pass additional parameters, you should add placeholders for them in the error formats in error-conditions.json otherwise they are useless. Found them in the PR: #48026

))
logError(s"Query $prettyIdString terminated with error", e)
updateStatusMessage(s"Terminated with exception: $message")
// Rethrow the fatal errors to allow the user using `Thread.UncaughtExceptionHandler` to
Expand Down