Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions core/src/main/scala/org/apache/spark/ui/ToolTips.scala
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,9 @@ private[spark] object ToolTips {
dynamic allocation is enabled. The number of granted executors may exceed the limit
ephemerally when executors are being killed.
"""

val SQL_TEXT =
"""Shows 140 characters by default. Click "+more" to see more. Long texts are truncated to 1000
|characters. Left blank when the query was not issued by SQL."""
.stripMargin.replaceAll("\n", " ")
}
23 changes: 21 additions & 2 deletions core/src/main/scala/org/apache/spark/ui/UIUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,9 @@ private[spark] object UIUtils extends Logging {
id: Option[String] = None,
headerClasses: Seq[String] = Seq.empty,
stripeRowsWithCss: Boolean = true,
sortable: Boolean = true): Seq[Node] = {
sortable: Boolean = true,
// If the tool tip is defined, Some(toolTipText, toolTipPosition), otherwise None.
headerToolTips: Seq[Option[(String, String)]] = Seq.empty): Seq[Node] = {

val listingTableClass = {
val _tableClass = if (stripeRowsWithCss) TABLE_CLASS_STRIPED else TABLE_CLASS_NOT_STRIPED
Expand All @@ -317,6 +319,14 @@ private[spark] object UIUtils extends Logging {
}
}

def getToolTip(index: Int): Option[(String, String)] = {
if (index < headerToolTips.size) {
headerToolTips(index)
} else {
None
}
}

val newlinesInHeader = headers.exists(_.contains("\n"))
def getHeaderContent(header: String): Seq[Node] = {
if (newlinesInHeader) {
Expand All @@ -330,7 +340,16 @@ private[spark] object UIUtils extends Logging {

val headerRow: Seq[Node] = {
headers.view.zipWithIndex.map { x =>
<th width={colWidthAttr} class={getClass(x._2)}>{getHeaderContent(x._1)}</th>
val toolTipOption = getToolTip(x._2)
if (toolTipOption.isEmpty) {
<th width={colWidthAttr} class={getClass(x._2)}>{getHeaderContent(x._1)}</th>
} else {
val toolTip = toolTipOption.get
// scalastyle:off line.size.limit
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you shouldn't need this, scala can handle line wrapping for Node return values

<th width={colWidthAttr} class={getClass(x._2)} data-toggle="tooltip" title={toolTip._1} data-placement={toolTip._2}>{getHeaderContent(x._1)}</th>
// scalastyle:on line.size.limit
}

}
}
<table class={listingTableClass} id={id.map(Text.apply)}>
Expand Down
22 changes: 20 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,26 @@ private[sql] object Dataset {
new Dataset(sparkSession, logicalPlan, implicitly[Encoder[T]])
}

def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame = {
val qe = sparkSession.sessionState.executePlan(logicalPlan)
def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan, sqlText: Option[String] = None)
: DataFrame = {

// Record the original sql text in the execute plan for checking in the web UI.
// Truncate the text to avoid downing browsers or web UI servers by running out of memory.
val text = sqlText.get;
val maxLength = 1000
val suffix = " ... (truncated)"
val truncateLength = maxLength - suffix.length
val truncatedSqlText: Option[String] = sqlText match {
case None => None
case Some(text) => Some(
if (text.length <= maxLength) {
text
} else {
text.substring(0, truncateLength) + suffix
})
}

val qe = sparkSession.sessionState.executePlan(logicalPlan, truncatedSqlText)
qe.assertAnalyzed()
new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,7 @@ class SparkSession private(
* @since 2.0.0
*/
def sql(sqlText: String): DataFrame = {
Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText))
Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText), Some(sqlText))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,11 @@ import org.apache.spark.util.Utils
* While this is not a public class, we should avoid changing the function names for the sake of
* changing them, because a lot of developers use the feature for debugging.
*/
class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
class QueryExecution(
val sparkSession: SparkSession,
val logical: LogicalPlan,
// Record the original sql text in the top logical plan for checking in the web UI.
val sqlText: Option[String] = None) {

// TODO: Move the planner an optimizer into here from SessionState.
protected def planner = sparkSession.sessionState.planner
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ object SQLExecution {

sparkSession.sparkContext.listenerBus.post(SparkListenerSQLExecutionStart(
executionId, callSite.shortForm, callSite.longForm, queryExecution.toString,
SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan), System.currentTimeMillis()))
SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan), queryExecution.sqlText,
System.currentTimeMillis()))
try {
body
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.xml.Node
import org.apache.commons.lang3.StringEscapeUtils

import org.apache.spark.internal.Logging
import org.apache.spark.ui.{UIUtils, WebUIPage}
import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage}

private[ui] class AllExecutionsPage(parent: SQLTab) extends WebUIPage("") with Logging {

Expand Down Expand Up @@ -60,6 +60,10 @@ private[ui] class AllExecutionsPage(parent: SQLTab) extends WebUIPage("") with L
function clickDetail(details) {{
details.parentNode.querySelector('.stage-details').classList.toggle('collapsed')
}}
function clickMore(details) {{
details.parentNode.querySelector('.sql-abstract').classList.toggle('collapsed')
details.parentNode.querySelector('.sql-full').classList.toggle('collapsed')
}}
</script>
UIUtils.headerSparkPage("SQL", content, parent, Some(5000))
}
Expand All @@ -83,10 +87,13 @@ private[ui] abstract class ExecutionTable(

protected def header: Seq[String]

protected def row(currentTime: Long, executionUIData: SQLExecutionUIData): Seq[Node] = {
protected def row(currentTime: Long, executionUIData: SQLExecutionUIData, showSqlText: Boolean)
: Seq[Node] = {
val submissionTime = executionUIData.submissionTime
val duration = executionUIData.completionTime.getOrElse(currentTime) - submissionTime

val sqlText = executionUIData.sqlText.getOrElse("")

val runningJobs = executionUIData.runningJobs.map { jobId =>
<a href={jobURL(jobId)}>{jobId.toString}</a><br/>
}
Expand Down Expand Up @@ -124,6 +131,11 @@ private[ui] abstract class ExecutionTable(
{failedJobs}
</td>
}}
{if (showSqlText) {
<td>
{sqlTextCell(sqlText)}
</td>
}}
</tr>
}

Expand All @@ -146,11 +158,43 @@ private[ui] abstract class ExecutionTable(
<div>{desc} {details}</div>
}

private def sqlTextCell(sqlText: String): Seq[Node] = {
// Only show a limited number of characters of sqlText by default when it is too long
val maxLength = 140
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about a button to copy full query text into clipboard?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this really necessary? The text can be easily highlighted and copied.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.. for very long query that wouldn't fit as per this comment

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Theres a "+more" button that expands to show the full query

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm, I understand you now, if we end up truncating the text at 1000 chars as proposed then yes this may be a useful feature, but if the text is available to copy wouldn't the size issue still exist?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you mean that it could be too long for clipboard? I don't think that would be an issue...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @felixcheung. When I am looking at the discussions again today, I find I am more clear about your suggestions. But actually, truncating very long queries is not because they are not visually pleasant to be displayed on the page, but it may take too many resources to store and transfer such a long unlimited string. So the queries longer than 1000 chars are truncated after parsing, which will never be recovered. So the idea of clipboard may not be used to resolve it.

But still, I believe the clipboard is a good idea. I do think we could extend the limit to 10k chars instead of 1k if we add a clipboard later.


if (sqlText.length <= maxLength) {
<div>{sqlText}</div>
} else {
val sqlAbstractText = sqlText.substring(0, maxLength) + " ..."
<div>
<div class="stage-details sql-abstract">
{sqlAbstractText}
</div>
<div class="stage-details sql-full collapsed">
{sqlText}
</div>
<span onclick="clickMore(this)" class="expand-details">
+more
</span>
</div>
}
}

def toNodeSeq: Seq[Node] = {
val showSqlText = executionUIDatas.exists(_.sqlText.isDefined)
val headerFull = header ++ {if (showSqlText) Seq("SQL Text") else Seq.empty}
val sqlTextToolTip = {if (showSqlText) {
Seq(Some(ToolTips.SQL_TEXT, "top"))
} else {
Seq.empty
}}
val headerToolTips: Seq[Option[(String, String)]] = header.map(_ => None) ++ sqlTextToolTip

<div>
<h4>{tableName}</h4>
{UIUtils.listingTable[SQLExecutionUIData](
header, row(currentTime, _), executionUIDatas, id = Some(tableId))}
headerFull, row(currentTime, _, showSqlText), executionUIDatas, id = Some(tableId),
headerToolTips = headerToolTips)}
</div>
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ case class SparkListenerSQLExecutionStart(
details: String,
physicalPlanDescription: String,
sparkPlanInfo: SparkPlanInfo,
sqlText: Option[String],
time: Long)
extends SparkListenerEvent

Expand Down Expand Up @@ -268,7 +269,7 @@ class SQLListener(conf: SparkConf) extends SparkListener with Logging {

override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
case SparkListenerSQLExecutionStart(executionId, description, details,
physicalPlanDescription, sparkPlanInfo, time) =>
physicalPlanDescription, sparkPlanInfo, sqlText, time) =>
val physicalPlanGraph = SparkPlanGraph(sparkPlanInfo)
val sqlPlanMetrics = physicalPlanGraph.allNodes.flatMap { node =>
node.metrics.map(metric => metric.accumulatorId -> metric)
Expand All @@ -280,6 +281,7 @@ class SQLListener(conf: SparkConf) extends SparkListener with Logging {
physicalPlanDescription,
physicalPlanGraph,
sqlPlanMetrics.toMap,
sqlText,
time)
synchronized {
activeExecutions(executionId) = executionUIData
Expand Down Expand Up @@ -428,6 +430,7 @@ private[ui] class SQLExecutionUIData(
val physicalPlanDescription: String,
val physicalPlanGraph: SparkPlanGraph,
val accumulatorMetrics: Map[Long, SQLPlanMetric],
val sqlText: Option[String],
val submissionTime: Long) {

var completionTime: Option[Long] = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,11 @@ abstract class BaseSessionStateBuilder(
new QueryExecution(session, plan)
}

protected def createQueryExecution(plan: LogicalPlan, sqlText: Option[String])
: QueryExecution = {
new QueryExecution(session, plan, sqlText)
}

/**
* Interface to start and stop streaming queries.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ private[sql] class SessionState(
val streamingQueryManager: StreamingQueryManager,
val listenerManager: ExecutionListenerManager,
val resourceLoader: SessionResourceLoader,
createQueryExecution: LogicalPlan => QueryExecution,
createQueryExecution: (LogicalPlan, Option[String]) => QueryExecution,
createClone: (SparkSession, SessionState) => SessionState) {

def newHadoopConf(): Configuration = SessionState.newHadoopConf(
Expand All @@ -93,7 +93,8 @@ private[sql] class SessionState(
// Helper methods, partially leftover from pre-2.0 days
// ------------------------------------------------------

def executePlan(plan: LogicalPlan): QueryExecution = createQueryExecution(plan)
def executePlan(plan: LogicalPlan, sqlText: Option[String] = None): QueryExecution
= createQueryExecution(plan, sqlText)

def refreshTable(tableName: String): Unit = {
catalog.refreshTable(sqlParser.parseTableIdentifier(tableName))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest
"test",
df.queryExecution.toString,
SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
None,
System.currentTimeMillis()))

val executionUIData = listener.executionIdToData(0)
Expand Down Expand Up @@ -259,6 +260,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest
"test",
df.queryExecution.toString,
SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
None,
System.currentTimeMillis()))
listener.onJobStart(SparkListenerJobStart(
jobId = 0,
Expand Down Expand Up @@ -289,6 +291,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest
"test",
df.queryExecution.toString,
SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
None,
System.currentTimeMillis()))
listener.onJobStart(SparkListenerJobStart(
jobId = 0,
Expand Down Expand Up @@ -330,6 +333,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest
"test",
df.queryExecution.toString,
SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
None,
System.currentTimeMillis()))
listener.onJobStart(SparkListenerJobStart(
jobId = 0,
Expand Down Expand Up @@ -369,7 +373,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest
// These are largely just boilerplate unrelated to what we're trying to test.
val df = createTestDataFrame
val executionStart = SparkListenerSQLExecutionStart(
0, "", "", "", SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), 0)
0, "", "", "", SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), None, 0)
val stageInfo = createStageInfo(0, 0)
val jobStart = SparkListenerJobStart(0, 0, Seq(stageInfo), createProperties(0))
val stageSubmitted = SparkListenerStageSubmitted(stageInfo)
Expand Down