From 91ed5339a7046a9562a79ef5d0c25320f9fbb766 Mon Sep 17 00:00:00 2001 From: Wei Liu Date: Fri, 8 Dec 2023 12:00:53 -0800 Subject: [PATCH 1/8] wip --- .../spark/sql/streaming/StreamingQuery.scala | 24 ++++++---- .../sql/streaming/StreamingQueryManager.scala | 48 ++++++++++--------- .../streaming/ClientStreamingQuerySuite.scala | 39 +++++++++++++++ project/MimaExcludes.scala | 3 -- 4 files changed, 79 insertions(+), 35 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala index 48ef0a907b5b..55ed0986d384 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala @@ -110,28 +110,32 @@ trait StreamingQuery { /** * Waits for the termination of `this` query, either by `query.stop()` or by an exception. + * If the query has terminated with an exception, then the exception will be thrown. * * If the query has terminated, then all subsequent calls to this method will either return - * immediately (if the query was terminated by `stop()`). + * immediately (if the query was terminated by `stop()`), or throw the exception + * immediately (if the query has terminated with exception). * + * @throws StreamingQueryException if the query has terminated with an exception. * @since 3.5.0 */ - // TODO(SPARK-43299): verity the behavior of this method after JVM client-side error-handling - // framework is supported and modify the doc accordingly. + @throws[StreamingQueryException] def awaitTermination(): Unit /** - * Waits for the termination of `this` query, either by `query.stop()` or by an exception. If - * the query has terminated with an exception, then the exception will be thrown. Otherwise, it - * returns whether the query has terminated or not within the `timeoutMs` milliseconds. + * Waits for the termination of `this` query, either by `query.stop()` or by an exception. + * If the query has terminated with an exception, then the exception will be thrown. + * Otherwise, it returns whether the query has terminated or not within the `timeoutMs` + * milliseconds. * - * If the query has terminated, then all subsequent calls to this method will return `true` - * immediately. + * If the query has terminated, then all subsequent calls to this method will either return + * `true` immediately (if the query was terminated by `stop()`), or throw the exception + * immediately (if the query has terminated with exception). * + * @throws StreamingQueryException if the query has terminated with an exception * @since 3.5.0 */ - // TODO(SPARK-43299): verity the behavior of this method after JVM client-side error-handling - // framework is supported and modify the doc accordingly. + @throws[StreamingQueryException] def awaitTermination(timeoutMs: Long): Boolean /** diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala index bea103880149..7d8791afb4a9 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala @@ -83,46 +83,50 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo } /** - * Wait until any of the queries on the associated SQLContext has terminated since the creation - * of the context, or since `resetTerminated()` was called. If any query was terminated with an - * exception, then the exception will be thrown. + * Wait until any of the queries on the associated SQLContext has terminated since the + * creation of the context, or since `resetTerminated()` was called. If any query was terminated + * with an exception, then the exception will be thrown. * * If a query has terminated, then subsequent calls to `awaitAnyTermination()` will either - * return immediately (if the query was terminated by `query.stop()`), or throw the exception - * immediately (if the query was terminated with exception). Use `resetTerminated()` to clear - * past terminations and wait for new terminations. + * return immediately (if the query was terminated by `query.stop()`), + * or throw the exception immediately (if the query was terminated with exception). Use + * `resetTerminated()` to clear past terminations and wait for new terminations. * - * For correctly documenting exceptions across multiple queries, users need to stop all of them - * after any of them terminates with exception, and then check the `query.exception()` for each - * query. + * In the case where multiple queries have terminated since `resetTermination()` was called, + * if any query has terminated with exception, then `awaitAnyTermination()` will + * throw any of the exception. For correctly documenting exceptions across multiple queries, + * users need to stop all of them after any of them terminates with exception, and then check the + * `query.exception()` for each query. * + * @throws StreamingQueryException if any query has terminated with an exception * @since 3.5.0 */ - // TODO(SPARK-43299): verity the behavior of this method after JVM client-side error-handling - // framework is supported and modify the doc accordingly. + @throws[StreamingQueryException] def awaitAnyTermination(): Unit = { executeManagerCmd(_.getAwaitAnyTerminationBuilder.build()) } /** - * Wait until any of the queries on the associated SQLContext has terminated since the creation - * of the context, or since `resetTerminated()` was called. Returns whether any query has - * terminated or not (multiple may have terminated). If any query has terminated with an + * Wait until any of the queries on the associated SQLContext has terminated since the + * creation of the context, or since `resetTerminated()` was called. Returns whether any query + * has terminated or not (multiple may have terminated). If any query has terminated with an * exception, then the exception will be thrown. * * If a query has terminated, then subsequent calls to `awaitAnyTermination()` will either - * return `true` immediately (if the query was terminated by `query.stop()`), or throw the - * exception immediately (if the query was terminated with exception). Use `resetTerminated()` - * to clear past terminations and wait for new terminations. + * return `true` immediately (if the query was terminated by `query.stop()`), + * or throw the exception immediately (if the query was terminated with exception). Use + * `resetTerminated()` to clear past terminations and wait for new terminations. * - * For correctly documenting exceptions across multiple queries, users need to stop all of them - * after any of them terminates with exception, and then check the `query.exception()` for each - * query. + * In the case where multiple queries have terminated since `resetTermination()` was called, + * if any query has terminated with exception, then `awaitAnyTermination()` will + * throw any of the exception. For correctly documenting exceptions across multiple queries, + * users need to stop all of them after any of them terminates with exception, and then check the + * `query.exception()` for each query. * + * @throws StreamingQueryException if any query has terminated with an exception * @since 3.5.0 */ - // TODO(SPARK-43299): verity the behavior of this method after JVM client-side error-handling - // framework is supported and modify the doc accordingly. + @throws[StreamingQueryException] def awaitAnyTermination(timeoutMs: Long): Boolean = { require(timeoutMs > 0, "Timeout has to be positive") executeManagerCmd( diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala index f995a0c3d947..343a1cce8475 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala @@ -214,6 +214,45 @@ class ClientStreamingQuerySuite extends QueryTest with SQLHelper with Logging { } } + test("throw exception in streaming, manager") { + // Disable spark.sql.pyspark.jvmStacktrace.enabled to avoid hitting the + // netty header limit. + withSQLConf("spark.sql.pyspark.jvmStacktrace.enabled" -> "false") { + val session = spark + import session.implicits._ + + val checkForTwo = udf((value: Int) => { + if (value == 2) { + throw new RuntimeException("Number 2 encountered!") + } + value + }) + + spark.readStream + .format("rate") + .option("rowsPerSecond", "1") + .load() + .select(checkForTwo($"value").as("checkedValue")) + .writeStream + .outputMode("append") + .format("console") + .start() + + val exception = intercept[StreamingQueryException] { + spark.streams.awaitAnyTermination() + } + + assert(exception.getErrorClass != null) + assert(!exception.getMessageParameters.isEmpty) + assert(exception.getCause.isInstanceOf[SparkException]) + assert(exception.getCause.getCause.isInstanceOf[SparkException]) + assert(exception.getCause.getCause.getCause.isInstanceOf[SparkException]) + assert( + exception.getCause.getCause.getCause.getMessage + .contains("java.lang.RuntimeException: Number 2 encountered!")) + } + } + test("foreach Row") { val writer = new TestForeachWriter[Row] diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 463212290877..a717f67e7e13 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -87,9 +87,6 @@ object MimaExcludes { ProblemFilters.exclude[Problem]("org.sparkproject.spark_protobuf.protobuf.*"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.protobuf.utils.SchemaConverters.*"), - // SPARK-43299: Convert StreamingQueryException in Scala Client - ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.StreamingQueryException"), - // SPARK-45856: Move ArtifactManager from Spark Connect into SparkSession (sql/core) ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.storage.CacheId.apply"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.storage.CacheId.userId"), From 21acd129185092c36c097c33ac37769d26a2e8ce Mon Sep 17 00:00:00 2001 From: Wei Liu Date: Mon, 11 Dec 2023 15:57:38 -0800 Subject: [PATCH 2/8] done --- .../streaming/StreamingQueryException.scala | 9 ++- .../streaming/ClientStreamingQuerySuite.scala | 73 +++++++++++-------- .../execution/streaming/StreamExecution.scala | 6 +- 3 files changed, 55 insertions(+), 33 deletions(-) diff --git a/common/utils/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala b/common/utils/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala index 1fde16bfa4e3..549fd83979b0 100644 --- a/common/utils/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala +++ b/common/utils/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala @@ -47,7 +47,14 @@ class StreamingQueryException private[sql]( cause: Throwable, errorClass: String, messageParameters: Map[String, String]) = { - this("", message, cause, null, null, errorClass, messageParameters) + this( + messageParameters.get("queryDebugString").getOrElse(null), + message, + cause, + messageParameters.get("startOffset").getOrElse(null), + messageParameters.get("endOffset").getOrElse(null), + errorClass, + messageParameters) } def this( diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala index 343a1cce8475..362d0ffd91bc 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala @@ -178,39 +178,47 @@ class ClientStreamingQuerySuite extends QueryTest with SQLHelper with Logging { test("throw exception in streaming") { // Disable spark.sql.pyspark.jvmStacktrace.enabled to avoid hitting the // netty header limit. - withSQLConf("spark.sql.pyspark.jvmStacktrace.enabled" -> "false") { - val session = spark - import session.implicits._ + try { + withSQLConf("spark.sql.pyspark.jvmStacktrace.enabled" -> "false") { + val session = spark + import session.implicits._ - val checkForTwo = udf((value: Int) => { - if (value == 2) { - throw new RuntimeException("Number 2 encountered!") - } - value - }) + val checkForTwo = udf((value: Int) => { + if (value == 2) { + throw new RuntimeException("Number 2 encountered!") + } + value + }) - val query = spark.readStream - .format("rate") - .option("rowsPerSecond", "1") - .load() - .select(checkForTwo($"value").as("checkedValue")) - .writeStream - .outputMode("append") - .format("console") - .start() + val query = spark.readStream + .format("rate") + .option("rowsPerSecond", "1") + .load() + .select(checkForTwo($"value").as("checkedValue")) + .writeStream + .outputMode("append") + .format("console") + .start() - val exception = intercept[StreamingQueryException] { - query.awaitTermination() - } + val exception = intercept[StreamingQueryException] { + query.awaitTermination() + } - assert(exception.getErrorClass != null) - assert(!exception.getMessageParameters.isEmpty) - assert(exception.getCause.isInstanceOf[SparkException]) - assert(exception.getCause.getCause.isInstanceOf[SparkException]) - assert(exception.getCause.getCause.getCause.isInstanceOf[SparkException]) - assert( - exception.getCause.getCause.getCause.getMessage - .contains("java.lang.RuntimeException: Number 2 encountered!")) + assert(exception.getErrorClass != null) + assert(exception.getMessageParameters().get("id") == query.id.toString) + assert(exception.getMessageParameters().get("runId") == query.runId.toString) + assert(exception.getMessageParameters().get("startOffset") != null) + assert(exception.getMessageParameters().get("endOffset") != null) + assert(exception.getCause.isInstanceOf[SparkException]) + assert(exception.getCause.getCause.isInstanceOf[SparkException]) + assert(exception.getCause.getCause.getCause.isInstanceOf[SparkException]) + assert( + exception.getCause.getCause.getCause.getMessage + .contains("java.lang.RuntimeException: Number 2 encountered!")) + } + } + finally { + spark.streams.resetTerminated() } } @@ -228,7 +236,7 @@ class ClientStreamingQuerySuite extends QueryTest with SQLHelper with Logging { value }) - spark.readStream + val query = spark.readStream .format("rate") .option("rowsPerSecond", "1") .load() @@ -243,7 +251,10 @@ class ClientStreamingQuerySuite extends QueryTest with SQLHelper with Logging { } assert(exception.getErrorClass != null) - assert(!exception.getMessageParameters.isEmpty) + assert(exception.getMessageParameters().get("id") == query.id.toString) + assert(exception.getMessageParameters().get("runId") == query.runId.toString) + assert(exception.getMessageParameters().get("startOffset") != null) + assert(exception.getMessageParameters().get("endOffset") != null) assert(exception.getCause.isInstanceOf[SparkException]) assert(exception.getCause.getCause.isInstanceOf[SparkException]) assert(exception.getCause.getCause.getCause.isInstanceOf[SparkException]) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index d9b3b207976c..aac26a727689 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -329,7 +329,11 @@ abstract class StreamExecution( messageParameters = Map( "id" -> id.toString, "runId" -> runId.toString, - "message" -> message)) + "message" -> message, + "queryDebugString" -> toDebugString(includeLogicalPlan = isInitialized), + "startOffset" -> committedOffsets.toOffsetSeq(sources, offsetSeqMetadata).toString, + "endOffset" -> availableOffsets.toOffsetSeq(sources, offsetSeqMetadata).toString + )) logError(s"Query $prettyIdString terminated with error", e) updateStatusMessage(s"Terminated with exception: $message") // Rethrow the fatal errors to allow the user using `Thread.UncaughtExceptionHandler` to From b957e921602ccb219dc6967a4f670b92837dd61f Mon Sep 17 00:00:00 2001 From: Wei Liu Date: Mon, 11 Dec 2023 15:58:24 -0800 Subject: [PATCH 3/8] add back mima skip --- project/MimaExcludes.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index a717f67e7e13..463212290877 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -87,6 +87,9 @@ object MimaExcludes { ProblemFilters.exclude[Problem]("org.sparkproject.spark_protobuf.protobuf.*"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.protobuf.utils.SchemaConverters.*"), + // SPARK-43299: Convert StreamingQueryException in Scala Client + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.StreamingQueryException"), + // SPARK-45856: Move ArtifactManager from Spark Connect into SparkSession (sql/core) ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.storage.CacheId.apply"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.storage.CacheId.userId"), From 55fab9462c78311c59da7d9d7b08ec418243b8cd Mon Sep 17 00:00:00 2001 From: Wei Liu Date: Tue, 12 Dec 2023 11:02:59 -0800 Subject: [PATCH 4/8] retrigger From ccd7c324a6f3194d9ecd28524346418b31885c95 Mon Sep 17 00:00:00 2001 From: Wei Liu Date: Tue, 12 Dec 2023 13:56:28 -0800 Subject: [PATCH 5/8] use empty string as default instead --- .../spark/sql/streaming/StreamingQueryException.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/common/utils/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala b/common/utils/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala index 549fd83979b0..800a6dcfda8d 100644 --- a/common/utils/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala +++ b/common/utils/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala @@ -48,11 +48,11 @@ class StreamingQueryException private[sql]( errorClass: String, messageParameters: Map[String, String]) = { this( - messageParameters.get("queryDebugString").getOrElse(null), + messageParameters.get("queryDebugString").getOrElse(""), message, cause, - messageParameters.get("startOffset").getOrElse(null), - messageParameters.get("endOffset").getOrElse(null), + messageParameters.get("startOffset").getOrElse(""), + messageParameters.get("endOffset").getOrElse(""), errorClass, messageParameters) } From 658c54deae38381d68dd873a88907d5f1b814877 Mon Sep 17 00:00:00 2001 From: Wei Liu Date: Tue, 12 Dec 2023 15:26:09 -0800 Subject: [PATCH 6/8] verify using isEmpty --- .../sql/streaming/ClientStreamingQuerySuite.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala index 362d0ffd91bc..145117bb482b 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala @@ -207,8 +207,8 @@ class ClientStreamingQuerySuite extends QueryTest with SQLHelper with Logging { assert(exception.getErrorClass != null) assert(exception.getMessageParameters().get("id") == query.id.toString) assert(exception.getMessageParameters().get("runId") == query.runId.toString) - assert(exception.getMessageParameters().get("startOffset") != null) - assert(exception.getMessageParameters().get("endOffset") != null) + assert(!exception.getMessageParameters().get("startOffset").isEmpty) + assert(!exception.getMessageParameters().get("endOffset").isEmpty) assert(exception.getCause.isInstanceOf[SparkException]) assert(exception.getCause.getCause.isInstanceOf[SparkException]) assert(exception.getCause.getCause.getCause.isInstanceOf[SparkException]) @@ -222,7 +222,7 @@ class ClientStreamingQuerySuite extends QueryTest with SQLHelper with Logging { } } - test("throw exception in streaming, manager") { + test("throw exception in streaming, check with StreamingQueryManager") { // Disable spark.sql.pyspark.jvmStacktrace.enabled to avoid hitting the // netty header limit. withSQLConf("spark.sql.pyspark.jvmStacktrace.enabled" -> "false") { @@ -253,8 +253,8 @@ class ClientStreamingQuerySuite extends QueryTest with SQLHelper with Logging { assert(exception.getErrorClass != null) assert(exception.getMessageParameters().get("id") == query.id.toString) assert(exception.getMessageParameters().get("runId") == query.runId.toString) - assert(exception.getMessageParameters().get("startOffset") != null) - assert(exception.getMessageParameters().get("endOffset") != null) + assert(!exception.getMessageParameters().get("startOffset").isEmpty) + assert(!exception.getMessageParameters().get("endOffset").isEmpty) assert(exception.getCause.isInstanceOf[SparkException]) assert(exception.getCause.getCause.isInstanceOf[SparkException]) assert(exception.getCause.getCause.getCause.isInstanceOf[SparkException]) From 6454e5a4049d3d894d4f932b9b8ddf29e9d1bec8 Mon Sep 17 00:00:00 2001 From: Wei Liu Date: Wed, 13 Dec 2023 10:06:04 -0800 Subject: [PATCH 7/8] fmt --- .../spark/sql/streaming/StreamingQuery.scala | 21 +++++---- .../sql/streaming/StreamingQueryManager.scala | 46 ++++++++++--------- .../streaming/ClientStreamingQuerySuite.scala | 3 +- 3 files changed, 36 insertions(+), 34 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala index 55ed0986d384..13a26fa79085 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala @@ -109,30 +109,31 @@ trait StreamingQuery { def lastProgress: StreamingQueryProgress /** - * Waits for the termination of `this` query, either by `query.stop()` or by an exception. - * If the query has terminated with an exception, then the exception will be thrown. + * Waits for the termination of `this` query, either by `query.stop()` or by an exception. If + * the query has terminated with an exception, then the exception will be thrown. * * If the query has terminated, then all subsequent calls to this method will either return - * immediately (if the query was terminated by `stop()`), or throw the exception - * immediately (if the query has terminated with exception). + * immediately (if the query was terminated by `stop()`), or throw the exception immediately (if + * the query has terminated with exception). * - * @throws StreamingQueryException if the query has terminated with an exception. + * @throws StreamingQueryException + * if the query has terminated with an exception. * @since 3.5.0 */ @throws[StreamingQueryException] def awaitTermination(): Unit /** - * Waits for the termination of `this` query, either by `query.stop()` or by an exception. - * If the query has terminated with an exception, then the exception will be thrown. - * Otherwise, it returns whether the query has terminated or not within the `timeoutMs` - * milliseconds. + * Waits for the termination of `this` query, either by `query.stop()` or by an exception. If + * the query has terminated with an exception, then the exception will be thrown. Otherwise, it + * returns whether the query has terminated or not within the `timeoutMs` milliseconds. * * If the query has terminated, then all subsequent calls to this method will either return * `true` immediately (if the query was terminated by `stop()`), or throw the exception * immediately (if the query has terminated with exception). * - * @throws StreamingQueryException if the query has terminated with an exception + * @throws StreamingQueryException + * if the query has terminated with an exception * @since 3.5.0 */ @throws[StreamingQueryException] diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala index 7d8791afb4a9..fd33efd72193 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala @@ -83,22 +83,23 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo } /** - * Wait until any of the queries on the associated SQLContext has terminated since the - * creation of the context, or since `resetTerminated()` was called. If any query was terminated - * with an exception, then the exception will be thrown. + * Wait until any of the queries on the associated SQLContext has terminated since the creation + * of the context, or since `resetTerminated()` was called. If any query was terminated with an + * exception, then the exception will be thrown. * * If a query has terminated, then subsequent calls to `awaitAnyTermination()` will either - * return immediately (if the query was terminated by `query.stop()`), - * or throw the exception immediately (if the query was terminated with exception). Use - * `resetTerminated()` to clear past terminations and wait for new terminations. + * return immediately (if the query was terminated by `query.stop()`), or throw the exception + * immediately (if the query was terminated with exception). Use `resetTerminated()` to clear + * past terminations and wait for new terminations. * - * In the case where multiple queries have terminated since `resetTermination()` was called, - * if any query has terminated with exception, then `awaitAnyTermination()` will - * throw any of the exception. For correctly documenting exceptions across multiple queries, - * users need to stop all of them after any of them terminates with exception, and then check the + * In the case where multiple queries have terminated since `resetTermination()` was called, if + * any query has terminated with exception, then `awaitAnyTermination()` will throw any of the + * exception. For correctly documenting exceptions across multiple queries, users need to stop + * all of them after any of them terminates with exception, and then check the * `query.exception()` for each query. * - * @throws StreamingQueryException if any query has terminated with an exception + * @throws StreamingQueryException + * if any query has terminated with an exception * @since 3.5.0 */ @throws[StreamingQueryException] @@ -107,23 +108,24 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo } /** - * Wait until any of the queries on the associated SQLContext has terminated since the - * creation of the context, or since `resetTerminated()` was called. Returns whether any query - * has terminated or not (multiple may have terminated). If any query has terminated with an + * Wait until any of the queries on the associated SQLContext has terminated since the creation + * of the context, or since `resetTerminated()` was called. Returns whether any query has + * terminated or not (multiple may have terminated). If any query has terminated with an * exception, then the exception will be thrown. * * If a query has terminated, then subsequent calls to `awaitAnyTermination()` will either - * return `true` immediately (if the query was terminated by `query.stop()`), - * or throw the exception immediately (if the query was terminated with exception). Use - * `resetTerminated()` to clear past terminations and wait for new terminations. + * return `true` immediately (if the query was terminated by `query.stop()`), or throw the + * exception immediately (if the query was terminated with exception). Use `resetTerminated()` + * to clear past terminations and wait for new terminations. * - * In the case where multiple queries have terminated since `resetTermination()` was called, - * if any query has terminated with exception, then `awaitAnyTermination()` will - * throw any of the exception. For correctly documenting exceptions across multiple queries, - * users need to stop all of them after any of them terminates with exception, and then check the + * In the case where multiple queries have terminated since `resetTermination()` was called, if + * any query has terminated with exception, then `awaitAnyTermination()` will throw any of the + * exception. For correctly documenting exceptions across multiple queries, users need to stop + * all of them after any of them terminates with exception, and then check the * `query.exception()` for each query. * - * @throws StreamingQueryException if any query has terminated with an exception + * @throws StreamingQueryException + * if any query has terminated with an exception * @since 3.5.0 */ @throws[StreamingQueryException] diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala index 145117bb482b..ac78b06bf5bc 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala @@ -216,8 +216,7 @@ class ClientStreamingQuerySuite extends QueryTest with SQLHelper with Logging { exception.getCause.getCause.getCause.getMessage .contains("java.lang.RuntimeException: Number 2 encountered!")) } - } - finally { + } finally { spark.streams.resetTerminated() } } From 6b45cea05ddf1e7a31ddfa24e0b7b28caa659da1 Mon Sep 17 00:00:00 2001 From: Wei Liu Date: Tue, 2 Jan 2024 12:06:20 -0800 Subject: [PATCH 8/8] remove jvmStacktrace sql conf --- .../streaming/ClientStreamingQuerySuite.scala | 90 +++++++++---------- 1 file changed, 41 insertions(+), 49 deletions(-) diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala index ac78b06bf5bc..91c562c0f98b 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala @@ -176,55 +176,7 @@ class ClientStreamingQuerySuite extends QueryTest with SQLHelper with Logging { } test("throw exception in streaming") { - // Disable spark.sql.pyspark.jvmStacktrace.enabled to avoid hitting the - // netty header limit. try { - withSQLConf("spark.sql.pyspark.jvmStacktrace.enabled" -> "false") { - val session = spark - import session.implicits._ - - val checkForTwo = udf((value: Int) => { - if (value == 2) { - throw new RuntimeException("Number 2 encountered!") - } - value - }) - - val query = spark.readStream - .format("rate") - .option("rowsPerSecond", "1") - .load() - .select(checkForTwo($"value").as("checkedValue")) - .writeStream - .outputMode("append") - .format("console") - .start() - - val exception = intercept[StreamingQueryException] { - query.awaitTermination() - } - - assert(exception.getErrorClass != null) - assert(exception.getMessageParameters().get("id") == query.id.toString) - assert(exception.getMessageParameters().get("runId") == query.runId.toString) - assert(!exception.getMessageParameters().get("startOffset").isEmpty) - assert(!exception.getMessageParameters().get("endOffset").isEmpty) - assert(exception.getCause.isInstanceOf[SparkException]) - assert(exception.getCause.getCause.isInstanceOf[SparkException]) - assert(exception.getCause.getCause.getCause.isInstanceOf[SparkException]) - assert( - exception.getCause.getCause.getCause.getMessage - .contains("java.lang.RuntimeException: Number 2 encountered!")) - } - } finally { - spark.streams.resetTerminated() - } - } - - test("throw exception in streaming, check with StreamingQueryManager") { - // Disable spark.sql.pyspark.jvmStacktrace.enabled to avoid hitting the - // netty header limit. - withSQLConf("spark.sql.pyspark.jvmStacktrace.enabled" -> "false") { val session = spark import session.implicits._ @@ -246,7 +198,7 @@ class ClientStreamingQuerySuite extends QueryTest with SQLHelper with Logging { .start() val exception = intercept[StreamingQueryException] { - spark.streams.awaitAnyTermination() + query.awaitTermination() } assert(exception.getErrorClass != null) @@ -260,9 +212,49 @@ class ClientStreamingQuerySuite extends QueryTest with SQLHelper with Logging { assert( exception.getCause.getCause.getCause.getMessage .contains("java.lang.RuntimeException: Number 2 encountered!")) + } finally { + spark.streams.resetTerminated() } } + test("throw exception in streaming, check with StreamingQueryManager") { + val session = spark + import session.implicits._ + + val checkForTwo = udf((value: Int) => { + if (value == 2) { + throw new RuntimeException("Number 2 encountered!") + } + value + }) + + val query = spark.readStream + .format("rate") + .option("rowsPerSecond", "1") + .load() + .select(checkForTwo($"value").as("checkedValue")) + .writeStream + .outputMode("append") + .format("console") + .start() + + val exception = intercept[StreamingQueryException] { + spark.streams.awaitAnyTermination() + } + + assert(exception.getErrorClass != null) + assert(exception.getMessageParameters().get("id") == query.id.toString) + assert(exception.getMessageParameters().get("runId") == query.runId.toString) + assert(!exception.getMessageParameters().get("startOffset").isEmpty) + assert(!exception.getMessageParameters().get("endOffset").isEmpty) + assert(exception.getCause.isInstanceOf[SparkException]) + assert(exception.getCause.getCause.isInstanceOf[SparkException]) + assert(exception.getCause.getCause.getCause.isInstanceOf[SparkException]) + assert( + exception.getCause.getCause.getCause.getMessage + .contains("java.lang.RuntimeException: Number 2 encountered!")) + } + test("foreach Row") { val writer = new TestForeachWriter[Row]