diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index 734573ba31f7..2f4ca217e95b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -38,6 +38,7 @@ import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Range} import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.execution.ui.SparkListenerSQLTextCaptured import org.apache.spark.sql.internal._ import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION import org.apache.spark.sql.sources.BaseRelation @@ -146,6 +147,8 @@ class SparkSession private( } } + lazy private val substitutor = new VariableSubstitution(sessionState.conf) + /** * A wrapped version of this session in the form of a [[SQLContext]], for backward compatibility. * @@ -635,6 +638,9 @@ class SparkSession private( * @since 2.0.0 */ def sql(sqlText: String): DataFrame = { + sparkContext.listenerBus.post( + SparkListenerSQLTextCaptured(System.currentTimeMillis(), substitutor.substitute(sqlText)) + ) Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala index e751ce39cd5d..c155722469e9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala @@ -110,6 +110,15 @@ private[ui] class AllExecutionsPage(parent: SQLTab) extends WebUIPage("") with L } + + val captured = sqlStore.capturedSqlTexts + if (captured.nonEmpty) { + val sqlTexts: NodeSeq = new SQLTextTable( + parent, "captured-sql-text-table", + s"Captured SQL sentences (${captured.size})", captured).toNodeSeq + content ++= sqlTexts + } + UIUtils.headerSparkPage("SQL", summary ++ content, parent, Some(5000)) } } @@ -269,3 +278,33 @@ private[ui] class FailedExecutionTable( override protected def header: Seq[String] = baseHeader ++ Seq("Succeeded Job IDs", "Failed Job IDs") } + +private[ui] class SQLTextTable( + parent: SQLTab, + tableId: String, + tableName: String, + sqlTextDatas: Seq[SQLTextData]) { + + def header: Seq[String] = Seq( + "Submission Time", + "SQL Text") + + def row(sqlTextData: SQLTextData): Seq[Node] = { + + + {UIUtils.formatDate(sqlTextData.submissionTime)} + + + {sqlTextData.sqlText} + + + } + + def toNodeSeq: Seq[Node] = { +
+

{tableName}

+ {UIUtils.listingTable[SQLTextData]( + header, row(_), sqlTextDatas, id = Some(tableId))} +
+ } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala index 53fb9a0cc21c..7d572dbdc7ee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -21,6 +21,7 @@ import java.util.concurrent.ConcurrentHashMap import java.util.function.Function import scala.collection.JavaConverters._ +import scala.collection.mutable import org.apache.spark.{JobExecutionStatus, SparkConf} import org.apache.spark.internal.Logging @@ -45,6 +46,7 @@ class SQLAppStatusListener( // thread-safe. private val liveExecutions = new ConcurrentHashMap[Long, LiveExecutionData]() private val stageMetrics = new ConcurrentHashMap[Int, LiveStageMetrics]() + val sqlTexts = new mutable.ArrayBuffer[SQLTextData]() // Returns true if this listener has no live data. Exposed for tests only. private[sql] def noLiveData(): Boolean = { @@ -294,10 +296,16 @@ class SQLAppStatusListener( } } + private def onSQLTextCaptured(event: SparkListenerSQLTextCaptured): Unit = { + val SparkListenerSQLTextCaptured(submissionTime, sqlText) = event + sqlTexts += new SQLTextData(submissionTime, sqlText) + } + override def onOtherEvent(event: SparkListenerEvent): Unit = event match { case e: SparkListenerSQLExecutionStart => onExecutionStart(e) case e: SparkListenerSQLExecutionEnd => onExecutionEnd(e) case e: SparkListenerDriverAccumUpdates => onDriverAccumUpdates(e) + case e: SparkListenerSQLTextCaptured => onSQLTextCaptured(e) case _ => // Ignore } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala index 9a76584717f4..2ae99741a9e7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala @@ -71,6 +71,13 @@ class SQLAppStatusStore( def planGraph(executionId: Long): SparkPlanGraph = { store.read(classOf[SparkPlanGraphWrapper], executionId).toSparkPlanGraph() } + + def capturedSqlTexts: Seq[SQLTextData] = { + listener match { + case Some(l) => l.sqlTexts.toSeq + case None => Seq.empty[SQLTextData] + } + } } class SQLExecutionUIData( @@ -139,3 +146,7 @@ case class SQLPlanMetric( name: String, accumulatorId: Long, metricType: String) + +case class SQLTextData( + submissionTime: Long, + sqlText: String) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala index b58b8c6d45e5..d81911a01d1d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala @@ -41,6 +41,11 @@ case class SparkListenerSQLExecutionStart( case class SparkListenerSQLExecutionEnd(executionId: Long, time: Long) extends SparkListenerEvent +@DeveloperApi +case class SparkListenerSQLTextCaptured( + submissionTime: Long, + sqlText: String) extends SparkListenerEvent + /** * A message used to update SQL metric value for driver-side updates (which doesn't get reflected * automatically).