diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index ed7a9100cc7f..75cb5fd52cc9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -651,7 +651,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { try { val start = System.nanoTime() // call `QueryExecution.toRDD` to trigger the execution of commands. - SQLExecution.withNewExecutionId(session, qe)(qe.toRdd) + SQLExecution.withNewExecutionId(session, qe, ds.sqlText)(qe.toRdd) val end = System.nanoTime() session.listenerManager.onSuccess(name, qe, end - start) } catch { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 0aee1d7be578..1f91cbe2b573 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -61,18 +61,20 @@ import org.apache.spark.unsafe.types.CalendarInterval import org.apache.spark.util.Utils private[sql] object Dataset { - def apply[T: Encoder](sparkSession: SparkSession, logicalPlan: LogicalPlan): Dataset[T] = { - val dataset = new Dataset(sparkSession, logicalPlan, implicitly[Encoder[T]]) + def apply[T: Encoder](sparkSession: SparkSession, + logicalPlan: LogicalPlan, sqlText: String = ""): Dataset[T] = { + val dataset = new Dataset(sparkSession, logicalPlan, implicitly[Encoder[T]], sqlText) // Eagerly bind the encoder so we verify that the encoder matches the underlying // schema. The user will get an error if this is not the case. dataset.deserializer dataset } - def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame = { + def ofRows(sparkSession: SparkSession, + logicalPlan: LogicalPlan, sqlText: String = ""): DataFrame = { val qe = sparkSession.sessionState.executePlan(logicalPlan) qe.assertAnalyzed() - new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema)) + new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema), sqlText) } } @@ -166,7 +168,8 @@ private[sql] object Dataset { class Dataset[T] private[sql]( @transient val sparkSession: SparkSession, @DeveloperApi @InterfaceStability.Unstable @transient val queryExecution: QueryExecution, - encoder: Encoder[T]) + encoder: Encoder[T], + val sqlText: String = "") extends Serializable { queryExecution.assertAnalyzed() @@ -174,12 +177,19 @@ class Dataset[T] private[sql]( // Note for Spark contributors: if adding or updating any action in `Dataset`, please make sure // you wrap it with `withNewExecutionId` if this actions doesn't call other action. - def this(sparkSession: SparkSession, logicalPlan: LogicalPlan, encoder: Encoder[T]) = { - this(sparkSession, sparkSession.sessionState.executePlan(logicalPlan), encoder) + def this(sparkSession: SparkSession, logicalPlan: LogicalPlan, + encoder: Encoder[T]) = { + this(sparkSession, sparkSession.sessionState.executePlan(logicalPlan), encoder, "") } - def this(sqlContext: SQLContext, logicalPlan: LogicalPlan, encoder: Encoder[T]) = { - this(sqlContext.sparkSession, logicalPlan, encoder) + def this(sparkSession: SparkSession, logicalPlan: LogicalPlan, + encoder: Encoder[T], sqlText: String) = { + this(sparkSession, sparkSession.sessionState.executePlan(logicalPlan), encoder, sqlText) + } + + def this(sqlContext: SQLContext, logicalPlan: LogicalPlan, + encoder: Encoder[T], sqlText: String) = { + this(sqlContext.sparkSession, logicalPlan, encoder, sqlText) } @transient private[sql] val logicalPlan: LogicalPlan = { @@ -390,7 +400,8 @@ class Dataset[T] private[sql]( */ // This is declared with parentheses to prevent the Scala compiler from treating // `ds.toDF("1")` as invoking this toDF and then apply on the returned DataFrame. - def toDF(): DataFrame = new Dataset[Row](sparkSession, queryExecution, RowEncoder(schema)) + def toDF(): DataFrame = + new Dataset[Row](sparkSession, queryExecution, RowEncoder(schema), sqlText) /** * :: Experimental :: @@ -622,7 +633,8 @@ class Dataset[T] private[sql]( outputPartitioning, physicalPlan.outputOrdering, isStreaming - )(sparkSession)).as[T] + )(sparkSession), + sqlText).as[T] } /** @@ -1364,10 +1376,11 @@ class Dataset[T] private[sql]( planWithBarrier) if (encoder.flat) { - new Dataset[U1](sparkSession, project, encoder) + new Dataset[U1](sparkSession, project, encoder, sqlText) } else { // Flattens inner fields of U1 - new Dataset[Tuple1[U1]](sparkSession, project, ExpressionEncoder.tuple(encoder)).map(_._1) + new Dataset[Tuple1[U1]](sparkSession, project, ExpressionEncoder.tuple(encoder), sqlText) + .map(_._1) } } @@ -1381,7 +1394,7 @@ class Dataset[T] private[sql]( val namedColumns = columns.map(_.withInputType(exprEnc, planWithBarrier.output).named) val execution = new QueryExecution(sparkSession, Project(namedColumns, planWithBarrier)) - new Dataset(sparkSession, execution, ExpressionEncoder.tuple(encoders)) + new Dataset(sparkSession, execution, ExpressionEncoder.tuple(encoders), sqlText) } /** @@ -2023,7 +2036,7 @@ class Dataset[T] private[sql]( val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _) normalizedCumWeights.sliding(2).map { x => new Dataset[T]( - sparkSession, Sample(x(0), x(1), withReplacement = false, seed, plan), encoder) + sparkSession, Sample(x(0), x(1), withReplacement = false, seed, plan), encoder, sqlText) }.toArray } @@ -2583,7 +2596,7 @@ class Dataset[T] private[sql]( new Dataset[U]( sparkSession, MapPartitions[T, U](func, planWithBarrier), - implicitly[Encoder[U]]) + implicitly[Encoder[U]], sqlText) } /** @@ -2613,7 +2626,8 @@ class Dataset[T] private[sql]( val rowEncoder = encoder.asInstanceOf[ExpressionEncoder[Row]] Dataset.ofRows( sparkSession, - MapPartitionsInR(func, packageNames, broadcastVars, schema, rowEncoder, planWithBarrier)) + MapPartitionsInR(func, packageNames, broadcastVars, schema, rowEncoder, planWithBarrier), + sqlText) } /** @@ -2766,7 +2780,7 @@ class Dataset[T] private[sql]( * @group action * @since 1.6.0 */ - def count(): Long = withAction("count", groupBy().count().queryExecution) { plan => + def count(): Long = withAction("count", groupBy().count().queryExecution, true) { plan => plan.executeCollect().head.getLong(0) } @@ -3222,7 +3236,7 @@ class Dataset[T] private[sql]( * an execution. */ private def withNewExecutionId[U](body: => U): U = { - SQLExecution.withNewExecutionId(sparkSession, queryExecution)(body) + SQLExecution.withNewExecutionId(sparkSession, queryExecution, sqlText)(body) } /** @@ -3231,7 +3245,7 @@ class Dataset[T] private[sql]( * reset. */ private def withNewRDDExecutionId[U](body: => U): U = { - SQLExecution.withNewExecutionId(sparkSession, rddQueryExecution) { + SQLExecution.withNewExecutionId(sparkSession, rddQueryExecution, sqlText) { rddQueryExecution.executedPlan.foreach { plan => plan.resetMetrics() } @@ -3243,13 +3257,17 @@ class Dataset[T] private[sql]( * Wrap a Dataset action to track the QueryExecution and time cost, then report to the * user-registered callback functions. */ - private def withAction[U](name: String, qe: QueryExecution)(action: SparkPlan => U) = { + private def withAction[U]( + name: String, + qe: QueryExecution, + hideSqlText: Boolean = false)(action: SparkPlan => U) = { try { qe.executedPlan.foreach { plan => plan.resetMetrics() } val start = System.nanoTime() - val result = SQLExecution.withNewExecutionId(sparkSession, qe) { + val result = SQLExecution.withNewExecutionId(sparkSession, qe, + if (hideSqlText) "" else sqlText) { action(qe.executedPlan) } val end = System.nanoTime() @@ -3292,21 +3310,21 @@ class Dataset[T] private[sql]( /** A convenient function to wrap a logical plan and produce a DataFrame. */ @inline private def withPlan(logicalPlan: LogicalPlan): DataFrame = { - Dataset.ofRows(sparkSession, logicalPlan) + Dataset.ofRows(sparkSession, logicalPlan, sqlText) } /** A convenient function to wrap a logical plan and produce a Dataset. */ @inline private def withTypedPlan[U : Encoder](logicalPlan: LogicalPlan): Dataset[U] = { - Dataset(sparkSession, logicalPlan) + Dataset(sparkSession, logicalPlan, sqlText) } /** A convenient function to wrap a set based logical plan and produce a Dataset. */ @inline private def withSetOperator[U : Encoder](logicalPlan: LogicalPlan): Dataset[U] = { if (classTag.runtimeClass.isAssignableFrom(classOf[Row])) { // Set operators widen types (change the schema), so we cannot reuse the row encoder. - Dataset.ofRows(sparkSession, logicalPlan).asInstanceOf[Dataset[U]] + Dataset.ofRows(sparkSession, logicalPlan, sqlText).asInstanceOf[Dataset[U]] } else { - Dataset(sparkSession, logicalPlan) + Dataset(sparkSession, logicalPlan, sqlText) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index 734573ba31f7..349f292517fb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -146,6 +146,8 @@ class SparkSession private( } } + lazy private val substitutor = new VariableSubstitution(sessionState.conf) + /** * A wrapped version of this session in the form of a [[SQLContext]], for backward compatibility. * @@ -635,7 +637,8 @@ class SparkSession private( * @since 2.0.0 */ def sql(sqlText: String): DataFrame = { - Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText)) + Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText), + substitutor.substitute(sqlText)) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala index e991da7df0bd..c2f418af4d76 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala @@ -58,7 +58,8 @@ object SQLExecution { */ def withNewExecutionId[T]( sparkSession: SparkSession, - queryExecution: QueryExecution)(body: => T): T = { + queryExecution: QueryExecution, + sqlText: String = "")(body: => T): T = { val sc = sparkSession.sparkContext val oldExecutionId = sc.getLocalProperty(EXECUTION_ID_KEY) val executionId = SQLExecution.nextExecutionId @@ -69,10 +70,10 @@ object SQLExecution { // set, then fall back to Utils.getCallSite(); call Utils.getCallSite() directly on // streaming queries would give us call site like "run at :0" val callSite = sparkSession.sparkContext.getCallSite() - sparkSession.sparkContext.listenerBus.post(SparkListenerSQLExecutionStart( executionId, callSite.shortForm, callSite.longForm, queryExecution.toString, - SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan), System.currentTimeMillis())) + SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan), + System.currentTimeMillis(), sqlText)) try { body } finally { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index 6e231970f4a2..b43ff0654940 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -503,7 +503,7 @@ class MicroBatchExecution( new Dataset(sparkSessionToRunBatch, lastExecution, RowEncoder(lastExecution.analyzed.schema)) reportTimeTaken("addBatch") { - SQLExecution.withNewExecutionId(sparkSessionToRunBatch, lastExecution) { + SQLExecution.withNewExecutionId(sparkSessionToRunBatch, lastExecution, nextBatch.sqlText) { sink match { case s: Sink => s.addBatch(currentBatchId, nextBatch) case _: StreamWriteSupport => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala index e0554f0c4d33..1b83cd2c95ba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala @@ -78,6 +78,7 @@ class ExecutionPage(parent: SQLTab) extends WebUIPage("execution") with Logging summary ++ planVisualization(metrics, graph) ++ + showSQLText(executionUIData.sqlText) ++ physicalPlanDescription(executionUIData.physicalPlanDescription) }.getOrElse {
No information to display for query {executionId}
@@ -120,6 +121,25 @@ class ExecutionPage(parent: SQLTab) extends WebUIPage("execution") with Logging private def jobURL(jobId: Long): String = "%s/jobs/job?id=%s".format(UIUtils.prependBaseUri(parent.basePath), jobId) + private def showSQLText(sqlText: String): Seq[Node] = { +
+ + + SQL text + +
+ + +
+ } + private def physicalPlanDescription(physicalPlanDescription: String): Seq[Node] = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala index 53fb9a0cc21c..f2750787fe8a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -230,7 +230,7 @@ class SQLAppStatusListener( private def onExecutionStart(event: SparkListenerSQLExecutionStart): Unit = { val SparkListenerSQLExecutionStart(executionId, description, details, - physicalPlanDescription, sparkPlanInfo, time) = event + physicalPlanDescription, sparkPlanInfo, time, sqlText) = event def toStoredNodes(nodes: Seq[SparkPlanGraphNode]): Seq[SparkPlanGraphNodeWrapper] = { nodes.map { @@ -265,6 +265,7 @@ class SQLAppStatusListener( exec.physicalPlanDescription = physicalPlanDescription exec.metrics = sqlPlanMetrics exec.submissionTime = time + exec.sqlText = sqlText update(exec) } @@ -351,6 +352,7 @@ private class LiveExecutionData(val executionId: Long) extends LiveEntity { var jobs = Map[Int, JobExecutionStatus]() var stages = Set[Int]() var driverAccumUpdates = Map[Long, Long]() + var sqlText: String = null @volatile var metricsValues: Map[Long, String] = null @@ -369,7 +371,8 @@ private class LiveExecutionData(val executionId: Long) extends LiveEntity { completionTime, jobs, stages, - metricsValues) + metricsValues, + sqlText) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala index 9a76584717f4..4022238e1c9f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala @@ -91,7 +91,8 @@ class SQLExecutionUIData( * from the SQL listener instance. */ @JsonDeserialize(keyAs = classOf[JLong]) - val metricValues: Map[Long, String]) { + val metricValues: Map[Long, String], + val sqlText: String) { @JsonIgnore @KVIndex("completionTime") private def completionTimeIndex: Long = completionTime.map(_.getTime).getOrElse(-1L) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala index b58b8c6d45e5..3c69db657215 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala @@ -34,7 +34,8 @@ case class SparkListenerSQLExecutionStart( details: String, physicalPlanDescription: String, sparkPlanInfo: SparkPlanInfo, - time: Long) + time: Long, + sqlText: String) extends SparkListenerEvent @DeveloperApi diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala index c2e62b987e0c..75da2de95215 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala @@ -41,12 +41,13 @@ class SQLJsonProtocolSuite extends SparkFunSuite { | "metadata":{}, | "metrics":[] | }, - | "time":0 + | "time":0, + | "sqlText":"select 1 as a" |} """.stripMargin val reconstructedEvent = JsonProtocol.sparkEventFromJson(parse(SQLExecutionStartJsonString)) val expectedEvent = SparkListenerSQLExecutionStart(0, "test desc", "test detail", "test plan", - new SparkPlanInfo("TestNode", "test string", Nil, Nil), 0) + new SparkPlanInfo("TestNode", "test string", Nil, Nil), 0, "select 1 as a") assert(reconstructedEvent == expectedEvent) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala index 85face3994fd..582dcd04dfb8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala @@ -166,7 +166,8 @@ class SQLAppStatusListenerSuite extends SparkFunSuite with SharedSQLContext with "test", df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), - System.currentTimeMillis())) + System.currentTimeMillis(), + "select 1 as a")) listener.onJobStart(SparkListenerJobStart( jobId = 0, @@ -298,7 +299,8 @@ class SQLAppStatusListenerSuite extends SparkFunSuite with SharedSQLContext with "test", df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), - System.currentTimeMillis())) + System.currentTimeMillis(), + "select 1 as a")) listener.onJobStart(SparkListenerJobStart( jobId = 0, time = System.currentTimeMillis(), @@ -327,7 +329,8 @@ class SQLAppStatusListenerSuite extends SparkFunSuite with SharedSQLContext with "test", df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), - System.currentTimeMillis())) + System.currentTimeMillis(), + "select 1 as a")) listener.onJobStart(SparkListenerJobStart( jobId = 0, time = System.currentTimeMillis(), @@ -367,7 +370,8 @@ class SQLAppStatusListenerSuite extends SparkFunSuite with SharedSQLContext with "test", df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), - System.currentTimeMillis())) + System.currentTimeMillis(), + "select 1 as a")) listener.onJobStart(SparkListenerJobStart( jobId = 0, time = System.currentTimeMillis(), @@ -396,7 +400,8 @@ class SQLAppStatusListenerSuite extends SparkFunSuite with SharedSQLContext with "test", df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), - System.currentTimeMillis())) + System.currentTimeMillis(), + "select 1 as a")) var stageId = 0 def twoStageJob(jobId: Int): Unit = { @@ -521,13 +526,15 @@ class SQLAppStatusListenerSuite extends SparkFunSuite with SharedSQLContext with val df = createTestDataFrame // Start execution 1 and execution 2 time += 1 + val sqlText = "select 1 as a" listener.onOtherEvent(SparkListenerSQLExecutionStart( 1, "test", "test", df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), - time)) + time, + sqlText)) time += 1 listener.onOtherEvent(SparkListenerSQLExecutionStart( 2, @@ -535,7 +542,8 @@ class SQLAppStatusListenerSuite extends SparkFunSuite with SharedSQLContext with "test", df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), - time)) + time, + sqlText)) // Stop execution 2 before execution 1 time += 1 @@ -551,7 +559,8 @@ class SQLAppStatusListenerSuite extends SparkFunSuite with SharedSQLContext with "test", df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), - time)) + time, + sqlText)) assert(statusStore.executionsCount === 2) assert(statusStore.execution(2) === None) } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala index 677590217344..5c665a93adcf 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala @@ -60,7 +60,7 @@ private[hive] class SparkSQLDriver(val context: SQLContext = SparkSQLEnv.sqlCont try { context.sparkContext.setJobDescription(command) val execution = context.sessionState.executePlan(context.sql(command).logicalPlan) - hiveResponse = SQLExecution.withNewExecutionId(context.sparkSession, execution) { + hiveResponse = SQLExecution.withNewExecutionId(context.sparkSession, execution, command) { execution.hiveResultString() } tableSchema = getResultSetSchema(execution) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala index 272e6f51f500..5c043b26a2cd 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala @@ -345,7 +345,9 @@ abstract class HiveComparisonTest val catalystResults = queryList.zip(hiveResults).map { case (queryString, hive) => val query = new TestHiveQueryExecution(queryString.replace("../../data", testDataPath)) def getResult(): Seq[String] = { - SQLExecution.withNewExecutionId(query.sparkSession, query)(query.hiveResultString()) + SQLExecution.withNewExecutionId(query.sparkSession, query, queryString) { + query.hiveResultString() + } } try { (query, prepareAnswer(query, getResult())) } catch { case e: Throwable =>