diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index ef9ef0659fb40..bf051f60b7a57 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -426,10 +426,6 @@ object MimaExcludes { ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.ProcessingTime"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.ProcessingTime$"), - // [SPARK-28556][SQL] QueryExecutionListener should also notify Error - ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.util.QueryExecutionListener.onFailure"), - ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.util.QueryExecutionListener.onFailure"), - // [SPARK-25382][SQL][PYSPARK] Remove ImageSchema.readImages in 3.0 ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.image.ImageSchema.readImages"), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala index 01f81825f6bfd..0b5951ec2ac97 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala @@ -23,7 +23,7 @@ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.internal.Logging import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent} import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.execution.{QueryExecution, QueryExecutionException} import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd import org.apache.spark.sql.internal.StaticSQLConf._ import org.apache.spark.util.{ListenerBus, Utils} @@ -55,12 +55,13 @@ trait QueryExecutionListener { * @param funcName the name of the action that triggered this query. * @param qe the QueryExecution object that carries detail information like logical plan, * physical plan, etc. - * @param error the error that failed this query. - * + * @param exception the exception that failed this query. If `java.lang.Error` is thrown during + * execution, it will be wrapped with an `Exception` and it can be accessed by + * `exception.getCause`. * @note This can be invoked by multiple different threads. */ @DeveloperApi - def onFailure(funcName: String, qe: QueryExecution, error: Throwable): Unit + def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit } @@ -140,7 +141,14 @@ private[sql] class ExecutionListenerBus(session: SparkSession) val funcName = event.executionName.get event.executionFailure match { case Some(ex) => - listener.onFailure(funcName, event.qe, ex) + val exception = ex match { + case e: Exception => e + case other: Throwable => + val message = "Hit an error when executing a query" + + (if (other.getMessage == null) "" else s": ${other.getMessage}") + new QueryExecutionException(message, other) + } + listener.onFailure(funcName, event.qe, exception) case _ => listener.onSuccess(funcName, event.qe, event.duration) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala index cd157086a8b8e..ac2ebd8bd748b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala @@ -71,7 +71,7 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo plan = qe.analyzed } - override def onFailure(funcName: String, qe: QueryExecution, error: Throwable): Unit = {} + override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {} } spark.listenerManager.register(listener) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SessionStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SessionStateSuite.scala index 31957a99e15af..003f5bc835d5f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SessionStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SessionStateSuite.scala @@ -135,7 +135,7 @@ class SessionStateSuite extends SparkFunSuite { test("fork new session and inherit listener manager") { class CommandCollector extends QueryExecutionListener { val commands: ArrayBuffer[String] = ArrayBuffer.empty[String] - override def onFailure(funcName: String, qe: QueryExecution, error: Throwable) : Unit = {} + override def onFailure(funcName: String, qe: QueryExecution, ex: Exception) : Unit = {} override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = { commands += funcName } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TestQueryExecutionListener.scala b/sql/core/src/test/scala/org/apache/spark/sql/TestQueryExecutionListener.scala index fd6bc9662bfad..d2a6358ee822b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/TestQueryExecutionListener.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/TestQueryExecutionListener.scala @@ -28,7 +28,7 @@ class TestQueryExecutionListener extends QueryExecutionListener { OnSuccessCall.isOnSuccessCalled.set(true) } - override def onFailure(funcName: String, qe: QueryExecution, error: Throwable): Unit = { } + override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = { } } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala index cbe2e91a20d61..e0857ed6bc35a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala @@ -341,7 +341,7 @@ class UDFSuite extends QueryTest with SharedSparkSession { withTempPath { path => var numTotalCachedHit = 0 val listener = new QueryExecutionListener { - override def onFailure(f: String, qe: QueryExecution, e: Throwable): Unit = {} + override def onFailure(f: String, qe: QueryExecution, e: Exception): Unit = {} override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = { qe.withCachedData match { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala index 0a6897b829994..7c7afa9cfbd41 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala @@ -141,7 +141,7 @@ class DataSourceV2DataFrameSuite override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { plan = qe.analyzed } - override def onFailure(funcName: String, qe: QueryExecution, error: Throwable): Unit = {} + override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {} } try { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala index b0da2eb697f36..51d734279414a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala @@ -157,13 +157,13 @@ class FileDataSourceV2FallBackSuite extends QueryTest with SharedSparkSession { Seq("parquet", classOf[ParquetDataSourceV2].getCanonicalName).foreach { format => withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> format) { val commands = ArrayBuffer.empty[(String, LogicalPlan)] - val errors = ArrayBuffer.empty[(String, Throwable)] + val exceptions = ArrayBuffer.empty[(String, Exception)] val listener = new QueryExecutionListener { override def onFailure( funcName: String, qe: QueryExecution, - error: Throwable): Unit = { - errors += funcName -> error + exception: Exception): Unit = { + exceptions += funcName -> exception } override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala index 9693a10f9afca..550bec7505422 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala @@ -216,7 +216,7 @@ class SupportsCatalogOptionsSuite extends QueryTest with SharedSparkSession with override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { plan = qe.analyzed } - override def onFailure(funcName: String, qe: QueryExecution, error: Throwable): Unit = {} + override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {} } spark.listenerManager.register(listener) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index fb939007697c2..9747840ce4032 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -282,7 +282,7 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSparkSession with plan = qe.analyzed } - override def onFailure(funcName: String, qe: QueryExecution, error: Throwable): Unit = {} + override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {} } spark.listenerManager.register(listener) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala index 6881812286b24..b17c93503804c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala @@ -20,15 +20,17 @@ package org.apache.spark.sql.util import scala.collection.mutable.ArrayBuffer import org.apache.spark._ -import org.apache.spark.sql.{functions, AnalysisException, QueryTest, Row} +import org.apache.spark.sql.{functions, AnalysisException, Dataset, QueryTest, Row, SparkSession} import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, InsertIntoStatement, LogicalPlan, Project} -import org.apache.spark.sql.execution.{QueryExecution, WholeStageCodegenExec} +import org.apache.spark.sql.execution.{QueryExecution, QueryExecutionException, WholeStageCodegenExec} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.spark.sql.execution.command.RunnableCommand import org.apache.spark.sql.execution.datasources.{CreateTable, InsertIntoHadoopFsRelationCommand} import org.apache.spark.sql.execution.datasources.json.JsonFileFormat -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.StringType class DataFrameCallbackSuite extends QueryTest with SharedSparkSession @@ -40,7 +42,7 @@ class DataFrameCallbackSuite extends QueryTest val metrics = ArrayBuffer.empty[(String, QueryExecution, Long)] val listener = new QueryExecutionListener { // Only test successful case here, so no need to implement `onFailure` - override def onFailure(funcName: String, qe: QueryExecution, error: Throwable): Unit = {} + override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {} override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = { metrics += ((funcName, qe, duration)) @@ -67,10 +69,10 @@ class DataFrameCallbackSuite extends QueryTest } testQuietly("execute callback functions when a DataFrame action failed") { - val metrics = ArrayBuffer.empty[(String, QueryExecution, Throwable)] + val metrics = ArrayBuffer.empty[(String, QueryExecution, Exception)] val listener = new QueryExecutionListener { - override def onFailure(funcName: String, qe: QueryExecution, error: Throwable): Unit = { - metrics += ((funcName, qe, error)) + override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = { + metrics += ((funcName, qe, exception)) } // Only test failed case here, so no need to implement `onSuccess` @@ -96,7 +98,7 @@ class DataFrameCallbackSuite extends QueryTest val metrics = ArrayBuffer.empty[Long] val listener = new QueryExecutionListener { // Only test successful case here, so no need to implement `onFailure` - override def onFailure(funcName: String, qe: QueryExecution, error: Throwable): Unit = {} + override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {} override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = { val metric = stripAQEPlan(qe.executedPlan) match { @@ -136,7 +138,7 @@ class DataFrameCallbackSuite extends QueryTest val metrics = ArrayBuffer.empty[Long] val listener = new QueryExecutionListener { // Only test successful case here, so no need to implement `onFailure` - override def onFailure(funcName: String, qe: QueryExecution, error: Throwable): Unit = {} + override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {} override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = { metrics += qe.executedPlan.longMetric("dataSize").value @@ -176,10 +178,10 @@ class DataFrameCallbackSuite extends QueryTest test("execute callback functions for DataFrameWriter") { val commands = ArrayBuffer.empty[(String, LogicalPlan)] - val errors = ArrayBuffer.empty[(String, Throwable)] + val exceptions = ArrayBuffer.empty[(String, Exception)] val listener = new QueryExecutionListener { - override def onFailure(funcName: String, qe: QueryExecution, error: Throwable): Unit = { - errors += funcName -> error + override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = { + exceptions += funcName -> exception } override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = { @@ -225,9 +227,9 @@ class DataFrameCallbackSuite extends QueryTest spark.range(10).select($"id", $"id").write.insertInto("tab") } sparkContext.listenerBus.waitUntilEmpty() - assert(errors.length == 1) - assert(errors.head._1 == "insertInto") - assert(errors.head._2 == e) + assert(exceptions.length == 1) + assert(exceptions.head._1 == "insertInto") + assert(exceptions.head._2 == e) } } @@ -238,7 +240,7 @@ class DataFrameCallbackSuite extends QueryTest metricMaps += qe.observedMetrics } - override def onFailure(funcName: String, qe: QueryExecution, exception: Throwable): Unit = { + override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = { // No-op } } @@ -278,4 +280,32 @@ class DataFrameCallbackSuite extends QueryTest spark.listenerManager.unregister(listener) } } + + testQuietly("SPARK-31144: QueryExecutionListener should receive `java.lang.Error`") { + var e: Exception = null + val listener = new QueryExecutionListener { + override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = { + e = exception + } + override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = {} + } + spark.listenerManager.register(listener) + + intercept[Error] { + Dataset.ofRows(spark, ErrorTestCommand("foo")).collect() + } + sparkContext.listenerBus.waitUntilEmpty() + assert(e != null && e.isInstanceOf[QueryExecutionException] + && e.getCause.isInstanceOf[Error] && e.getCause.getMessage == "foo") + spark.listenerManager.unregister(listener) + } +} + +/** A test command that throws `java.lang.Error` during execution. */ +case class ErrorTestCommand(foo: String) extends RunnableCommand { + + override val output: Seq[Attribute] = Seq(AttributeReference("foo", StringType)()) + + override def run(sparkSession: SparkSession): Seq[Row] = + throw new java.lang.Error(foo) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/util/ExecutionListenerManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/util/ExecutionListenerManagerSuite.scala index 2fd6cb220ea3f..ab854a0281c6c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/util/ExecutionListenerManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/util/ExecutionListenerManagerSuite.scala @@ -57,7 +57,7 @@ private class CountingQueryExecutionListener extends QueryExecutionListener { CALLBACK_COUNT.incrementAndGet() } - override def onFailure(funcName: String, qe: QueryExecution, error: Throwable): Unit = { + override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = { CALLBACK_COUNT.incrementAndGet() } diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/DummyListeners.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/DummyListeners.scala index d056b3b2153cf..4564c2209a931 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/DummyListeners.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/DummyListeners.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.util.QueryExecutionListener class DummyQueryExecutionListener extends QueryExecutionListener { override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = {} - override def onFailure(funcName: String, qe: QueryExecution, error: Throwable): Unit = {} + override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {} } class DummyStreamingQueryListener extends StreamingQueryListener {