diff --git a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala index 766cc65084f0..51f6e67d1a2c 100644 --- a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala +++ b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala @@ -99,4 +99,9 @@ private[spark] object ToolTips { dynamic allocation is enabled. The number of granted executors may exceed the limit ephemerally when executors are being killed. """ + + val SQL_TEXT = + """Shows 140 characters by default. Click "+more" to see more. Long texts are truncated to 1000 + |characters. Left blank when the query was not issued by SQL.""" + .stripMargin.replaceAll("\n", " ") } diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index 2610f673d27f..2bdb241152ed 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -296,7 +296,9 @@ private[spark] object UIUtils extends Logging { id: Option[String] = None, headerClasses: Seq[String] = Seq.empty, stripeRowsWithCss: Boolean = true, - sortable: Boolean = true): Seq[Node] = { + sortable: Boolean = true, + // If the tool tip is defined, Some(toolTipText, toolTipPosition), otherwise None. + headerToolTips: Seq[Option[(String, String)]] = Seq.empty): Seq[Node] = { val listingTableClass = { val _tableClass = if (stripeRowsWithCss) TABLE_CLASS_STRIPED else TABLE_CLASS_NOT_STRIPED @@ -317,6 +319,14 @@ private[spark] object UIUtils extends Logging { } } + def getToolTip(index: Int): Option[(String, String)] = { + if (index < headerToolTips.size) { + headerToolTips(index) + } else { + None + } + } + val newlinesInHeader = headers.exists(_.contains("\n")) def getHeaderContent(header: String): Seq[Node] = { if (newlinesInHeader) { @@ -330,7 +340,16 @@ private[spark] object UIUtils extends Logging { val headerRow: Seq[Node] = { headers.view.zipWithIndex.map { x => - {getHeaderContent(x._1)} + val toolTipOption = getToolTip(x._2) + if (toolTipOption.isEmpty) { + {getHeaderContent(x._1)} + } else { + val toolTip = toolTipOption.get + // scalastyle:off line.size.limit + {getHeaderContent(x._1)} + // scalastyle:on line.size.limit + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 0e7415890e21..b282bfc44903 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -61,8 +61,26 @@ private[sql] object Dataset { new Dataset(sparkSession, logicalPlan, implicitly[Encoder[T]]) } - def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame = { - val qe = sparkSession.sessionState.executePlan(logicalPlan) + def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan, sqlText: Option[String] = None) + : DataFrame = { + + // Record the original sql text in the execute plan for checking in the web UI. + // Truncate the text to avoid downing browsers or web UI servers by running out of memory. + val text = sqlText.get; + val maxLength = 1000 + val suffix = " ... (truncated)" + val truncateLength = maxLength - suffix.length + val truncatedSqlText: Option[String] = sqlText match { + case None => None + case Some(text) => Some( + if (text.length <= maxLength) { + text + } else { + text.substring(0, truncateLength) + suffix + }) + } + + val qe = sparkSession.sessionState.executePlan(logicalPlan, truncatedSqlText) qe.assertAnalyzed() new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index d2bf35071193..ece2ec6173f1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -620,7 +620,7 @@ class SparkSession private( * @since 2.0.0 */ def sql(sqlText: String): DataFrame = { - Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText)) + Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText), Some(sqlText)) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 2e05e5d65923..fe9af812bf44 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -39,7 +39,11 @@ import org.apache.spark.util.Utils * While this is not a public class, we should avoid changing the function names for the sake of * changing them, because a lot of developers use the feature for debugging. */ -class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { +class QueryExecution( + val sparkSession: SparkSession, + val logical: LogicalPlan, + // Record the original sql text in the top logical plan for checking in the web UI. + val sqlText: Option[String] = None) { // TODO: Move the planner an optimizer into here from SessionState. protected def planner = sparkSession.sessionState.planner diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala index be35916e3447..8492eb5179fe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala @@ -60,7 +60,8 @@ object SQLExecution { sparkSession.sparkContext.listenerBus.post(SparkListenerSQLExecutionStart( executionId, callSite.shortForm, callSite.longForm, queryExecution.toString, - SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan), System.currentTimeMillis())) + SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan), queryExecution.sqlText, + System.currentTimeMillis())) try { body } finally { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala index e96fb9f7550a..1801c4895bbb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala @@ -25,7 +25,7 @@ import scala.xml.Node import org.apache.commons.lang3.StringEscapeUtils import org.apache.spark.internal.Logging -import org.apache.spark.ui.{UIUtils, WebUIPage} +import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage} private[ui] class AllExecutionsPage(parent: SQLTab) extends WebUIPage("") with Logging { @@ -60,6 +60,10 @@ private[ui] class AllExecutionsPage(parent: SQLTab) extends WebUIPage("") with L function clickDetail(details) {{ details.parentNode.querySelector('.stage-details').classList.toggle('collapsed') }} + function clickMore(details) {{ + details.parentNode.querySelector('.sql-abstract').classList.toggle('collapsed') + details.parentNode.querySelector('.sql-full').classList.toggle('collapsed') + }} UIUtils.headerSparkPage("SQL", content, parent, Some(5000)) } @@ -83,10 +87,13 @@ private[ui] abstract class ExecutionTable( protected def header: Seq[String] - protected def row(currentTime: Long, executionUIData: SQLExecutionUIData): Seq[Node] = { + protected def row(currentTime: Long, executionUIData: SQLExecutionUIData, showSqlText: Boolean) + : Seq[Node] = { val submissionTime = executionUIData.submissionTime val duration = executionUIData.completionTime.getOrElse(currentTime) - submissionTime + val sqlText = executionUIData.sqlText.getOrElse("") + val runningJobs = executionUIData.runningJobs.map { jobId => {jobId.toString}
} @@ -124,6 +131,11 @@ private[ui] abstract class ExecutionTable( {failedJobs} }} + {if (showSqlText) { + + }} } @@ -146,11 +158,43 @@ private[ui] abstract class ExecutionTable(
{desc} {details}
} + private def sqlTextCell(sqlText: String): Seq[Node] = { + // Only show a limited number of characters of sqlText by default when it is too long + val maxLength = 140 + + if (sqlText.length <= maxLength) { +
{sqlText}
+ } else { + val sqlAbstractText = sqlText.substring(0, maxLength) + " ..." +
+
+ {sqlAbstractText} +
+ + + +more + +
+ } + } + def toNodeSeq: Seq[Node] = { + val showSqlText = executionUIDatas.exists(_.sqlText.isDefined) + val headerFull = header ++ {if (showSqlText) Seq("SQL Text") else Seq.empty} + val sqlTextToolTip = {if (showSqlText) { + Seq(Some(ToolTips.SQL_TEXT, "top")) + } else { + Seq.empty + }} + val headerToolTips: Seq[Option[(String, String)]] = header.map(_ => None) ++ sqlTextToolTip +

{tableName}

{UIUtils.listingTable[SQLExecutionUIData]( - header, row(currentTime, _), executionUIDatas, id = Some(tableId))} + headerFull, row(currentTime, _, showSqlText), executionUIDatas, id = Some(tableId), + headerToolTips = headerToolTips)}
} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala index b4a91230a001..a23f698bf0bf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala @@ -40,6 +40,7 @@ case class SparkListenerSQLExecutionStart( details: String, physicalPlanDescription: String, sparkPlanInfo: SparkPlanInfo, + sqlText: Option[String], time: Long) extends SparkListenerEvent @@ -268,7 +269,7 @@ class SQLListener(conf: SparkConf) extends SparkListener with Logging { override def onOtherEvent(event: SparkListenerEvent): Unit = event match { case SparkListenerSQLExecutionStart(executionId, description, details, - physicalPlanDescription, sparkPlanInfo, time) => + physicalPlanDescription, sparkPlanInfo, sqlText, time) => val physicalPlanGraph = SparkPlanGraph(sparkPlanInfo) val sqlPlanMetrics = physicalPlanGraph.allNodes.flatMap { node => node.metrics.map(metric => metric.accumulatorId -> metric) @@ -280,6 +281,7 @@ class SQLListener(conf: SparkConf) extends SparkListener with Logging { physicalPlanDescription, physicalPlanGraph, sqlPlanMetrics.toMap, + sqlText, time) synchronized { activeExecutions(executionId) = executionUIData @@ -428,6 +430,7 @@ private[ui] class SQLExecutionUIData( val physicalPlanDescription: String, val physicalPlanGraph: SparkPlanGraph, val accumulatorMetrics: Map[Long, SQLPlanMetric], + val sqlText: Option[String], val submissionTime: Long) { var completionTime: Option[Long] = None diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala index 2a801d87b12e..e9b07e9dcd3a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala @@ -253,6 +253,11 @@ abstract class BaseSessionStateBuilder( new QueryExecution(session, plan) } + protected def createQueryExecution(plan: LogicalPlan, sqlText: Option[String]) + : QueryExecution = { + new QueryExecution(session, plan, sqlText) + } + /** * Interface to start and stop streaming queries. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index 1b341a12fc60..1af2aff5ebe6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -67,7 +67,7 @@ private[sql] class SessionState( val streamingQueryManager: StreamingQueryManager, val listenerManager: ExecutionListenerManager, val resourceLoader: SessionResourceLoader, - createQueryExecution: LogicalPlan => QueryExecution, + createQueryExecution: (LogicalPlan, Option[String]) => QueryExecution, createClone: (SparkSession, SessionState) => SessionState) { def newHadoopConf(): Configuration = SessionState.newHadoopConf( @@ -93,7 +93,8 @@ private[sql] class SessionState( // Helper methods, partially leftover from pre-2.0 days // ------------------------------------------------------ - def executePlan(plan: LogicalPlan): QueryExecution = createQueryExecution(plan) + def executePlan(plan: LogicalPlan, sqlText: Option[String] = None): QueryExecution + = createQueryExecution(plan, sqlText) def refreshTable(tableName: String): Unit = { catalog.refreshTable(sqlParser.parseTableIdentifier(tableName)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala index e6cd41e4facf..4a9dee4355ff 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala @@ -123,6 +123,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest "test", df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), + None, System.currentTimeMillis())) val executionUIData = listener.executionIdToData(0) @@ -259,6 +260,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest "test", df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), + None, System.currentTimeMillis())) listener.onJobStart(SparkListenerJobStart( jobId = 0, @@ -289,6 +291,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest "test", df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), + None, System.currentTimeMillis())) listener.onJobStart(SparkListenerJobStart( jobId = 0, @@ -330,6 +333,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest "test", df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), + None, System.currentTimeMillis())) listener.onJobStart(SparkListenerJobStart( jobId = 0, @@ -369,7 +373,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest // These are largely just boilerplate unrelated to what we're trying to test. val df = createTestDataFrame val executionStart = SparkListenerSQLExecutionStart( - 0, "", "", "", SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), 0) + 0, "", "", "", SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), None, 0) val stageInfo = createStageInfo(0, 0) val jobStart = SparkListenerJobStart(0, 0, Seq(stageInfo), createProperties(0)) val stageSubmitted = SparkListenerStageSubmitted(stageInfo)
+ {sqlTextCell(sqlText)} +