diff --git a/common/utils/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala b/common/utils/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala index 1fde16bfa4e3..800a6dcfda8d 100644 --- a/common/utils/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala +++ b/common/utils/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala @@ -47,7 +47,14 @@ class StreamingQueryException private[sql]( cause: Throwable, errorClass: String, messageParameters: Map[String, String]) = { - this("", message, cause, null, null, errorClass, messageParameters) + this( + messageParameters.get("queryDebugString").getOrElse(""), + message, + cause, + messageParameters.get("startOffset").getOrElse(""), + messageParameters.get("endOffset").getOrElse(""), + errorClass, + messageParameters) } def this( diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala index 48ef0a907b5b..13a26fa79085 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala @@ -109,15 +109,18 @@ trait StreamingQuery { def lastProgress: StreamingQueryProgress /** - * Waits for the termination of `this` query, either by `query.stop()` or by an exception. + * Waits for the termination of `this` query, either by `query.stop()` or by an exception. If + * the query has terminated with an exception, then the exception will be thrown. * * If the query has terminated, then all subsequent calls to this method will either return - * immediately (if the query was terminated by `stop()`). + * immediately (if the query was terminated by `stop()`), or throw the exception immediately (if + * the query has terminated with exception). * + * @throws StreamingQueryException + * if the query has terminated with an exception. * @since 3.5.0 */ - // TODO(SPARK-43299): verity the behavior of this method after JVM client-side error-handling - // framework is supported and modify the doc accordingly. + @throws[StreamingQueryException] def awaitTermination(): Unit /** @@ -125,13 +128,15 @@ trait StreamingQuery { * the query has terminated with an exception, then the exception will be thrown. Otherwise, it * returns whether the query has terminated or not within the `timeoutMs` milliseconds. * - * If the query has terminated, then all subsequent calls to this method will return `true` - * immediately. + * If the query has terminated, then all subsequent calls to this method will either return + * `true` immediately (if the query was terminated by `stop()`), or throw the exception + * immediately (if the query has terminated with exception). * + * @throws StreamingQueryException + * if the query has terminated with an exception * @since 3.5.0 */ - // TODO(SPARK-43299): verity the behavior of this method after JVM client-side error-handling - // framework is supported and modify the doc accordingly. + @throws[StreamingQueryException] def awaitTermination(timeoutMs: Long): Boolean /** diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala index bea103880149..fd33efd72193 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala @@ -92,14 +92,17 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo * immediately (if the query was terminated with exception). Use `resetTerminated()` to clear * past terminations and wait for new terminations. * - * For correctly documenting exceptions across multiple queries, users need to stop all of them - * after any of them terminates with exception, and then check the `query.exception()` for each - * query. + * In the case where multiple queries have terminated since `resetTermination()` was called, if + * any query has terminated with exception, then `awaitAnyTermination()` will throw any of the + * exception. For correctly documenting exceptions across multiple queries, users need to stop + * all of them after any of them terminates with exception, and then check the + * `query.exception()` for each query. * + * @throws StreamingQueryException + * if any query has terminated with an exception * @since 3.5.0 */ - // TODO(SPARK-43299): verity the behavior of this method after JVM client-side error-handling - // framework is supported and modify the doc accordingly. + @throws[StreamingQueryException] def awaitAnyTermination(): Unit = { executeManagerCmd(_.getAwaitAnyTerminationBuilder.build()) } @@ -115,14 +118,17 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo * exception immediately (if the query was terminated with exception). Use `resetTerminated()` * to clear past terminations and wait for new terminations. * - * For correctly documenting exceptions across multiple queries, users need to stop all of them - * after any of them terminates with exception, and then check the `query.exception()` for each - * query. + * In the case where multiple queries have terminated since `resetTermination()` was called, if + * any query has terminated with exception, then `awaitAnyTermination()` will throw any of the + * exception. For correctly documenting exceptions across multiple queries, users need to stop + * all of them after any of them terminates with exception, and then check the + * `query.exception()` for each query. * + * @throws StreamingQueryException + * if any query has terminated with an exception * @since 3.5.0 */ - // TODO(SPARK-43299): verity the behavior of this method after JVM client-side error-handling - // framework is supported and modify the doc accordingly. + @throws[StreamingQueryException] def awaitAnyTermination(timeoutMs: Long): Boolean = { require(timeoutMs > 0, "Timeout has to be positive") executeManagerCmd( diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala index f995a0c3d947..91c562c0f98b 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala @@ -176,9 +176,7 @@ class ClientStreamingQuerySuite extends QueryTest with SQLHelper with Logging { } test("throw exception in streaming") { - // Disable spark.sql.pyspark.jvmStacktrace.enabled to avoid hitting the - // netty header limit. - withSQLConf("spark.sql.pyspark.jvmStacktrace.enabled" -> "false") { + try { val session = spark import session.implicits._ @@ -204,14 +202,57 @@ class ClientStreamingQuerySuite extends QueryTest with SQLHelper with Logging { } assert(exception.getErrorClass != null) - assert(!exception.getMessageParameters.isEmpty) + assert(exception.getMessageParameters().get("id") == query.id.toString) + assert(exception.getMessageParameters().get("runId") == query.runId.toString) + assert(!exception.getMessageParameters().get("startOffset").isEmpty) + assert(!exception.getMessageParameters().get("endOffset").isEmpty) assert(exception.getCause.isInstanceOf[SparkException]) assert(exception.getCause.getCause.isInstanceOf[SparkException]) assert(exception.getCause.getCause.getCause.isInstanceOf[SparkException]) assert( exception.getCause.getCause.getCause.getMessage .contains("java.lang.RuntimeException: Number 2 encountered!")) + } finally { + spark.streams.resetTerminated() + } + } + + test("throw exception in streaming, check with StreamingQueryManager") { + val session = spark + import session.implicits._ + + val checkForTwo = udf((value: Int) => { + if (value == 2) { + throw new RuntimeException("Number 2 encountered!") + } + value + }) + + val query = spark.readStream + .format("rate") + .option("rowsPerSecond", "1") + .load() + .select(checkForTwo($"value").as("checkedValue")) + .writeStream + .outputMode("append") + .format("console") + .start() + + val exception = intercept[StreamingQueryException] { + spark.streams.awaitAnyTermination() } + + assert(exception.getErrorClass != null) + assert(exception.getMessageParameters().get("id") == query.id.toString) + assert(exception.getMessageParameters().get("runId") == query.runId.toString) + assert(!exception.getMessageParameters().get("startOffset").isEmpty) + assert(!exception.getMessageParameters().get("endOffset").isEmpty) + assert(exception.getCause.isInstanceOf[SparkException]) + assert(exception.getCause.getCause.isInstanceOf[SparkException]) + assert(exception.getCause.getCause.getCause.isInstanceOf[SparkException]) + assert( + exception.getCause.getCause.getCause.getMessage + .contains("java.lang.RuntimeException: Number 2 encountered!")) } test("foreach Row") { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index d9b3b207976c..aac26a727689 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -329,7 +329,11 @@ abstract class StreamExecution( messageParameters = Map( "id" -> id.toString, "runId" -> runId.toString, - "message" -> message)) + "message" -> message, + "queryDebugString" -> toDebugString(includeLogicalPlan = isInitialized), + "startOffset" -> committedOffsets.toOffsetSeq(sources, offsetSeqMetadata).toString, + "endOffset" -> availableOffsets.toOffsetSeq(sources, offsetSeqMetadata).toString + )) logError(s"Query $prettyIdString terminated with error", e) updateStatusMessage(s"Terminated with exception: $message") // Rethrow the fatal errors to allow the user using `Thread.UncaughtExceptionHandler` to