Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Range}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.ui.SparkListenerSQLTextCaptured
import org.apache.spark.sql.internal._
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
import org.apache.spark.sql.sources.BaseRelation
Expand Down Expand Up @@ -146,6 +147,8 @@ class SparkSession private(
}
}

lazy private val substitutor = new VariableSubstitution(sessionState.conf)

/**
* A wrapped version of this session in the form of a [[SQLContext]], for backward compatibility.
*
Expand Down Expand Up @@ -635,6 +638,9 @@ class SparkSession private(
* @since 2.0.0
*/
def sql(sqlText: String): DataFrame = {
sparkContext.listenerBus.post(
SparkListenerSQLTextCaptured(System.currentTimeMillis(), substitutor.substitute(sqlText))
)
Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,15 @@ private[ui] class AllExecutionsPage(parent: SQLTab) extends WebUIPage("") with L
}
</ul>
</div>

val captured = sqlStore.capturedSqlTexts
if (captured.nonEmpty) {
val sqlTexts: NodeSeq = new SQLTextTable(
parent, "captured-sql-text-table",
s"Captured SQL sentences (${captured.size})", captured).toNodeSeq
content ++= sqlTexts
}

UIUtils.headerSparkPage("SQL", summary ++ content, parent, Some(5000))
}
}
Expand Down Expand Up @@ -269,3 +278,33 @@ private[ui] class FailedExecutionTable(
override protected def header: Seq[String] =
baseHeader ++ Seq("Succeeded Job IDs", "Failed Job IDs")
}

private[ui] class SQLTextTable(
parent: SQLTab,
tableId: String,
tableName: String,
sqlTextDatas: Seq[SQLTextData]) {

def header: Seq[String] = Seq(
"Submission Time",
"SQL Text")

def row(sqlTextData: SQLTextData): Seq[Node] = {
<tr>
<td sorttable_customkey={sqlTextData.submissionTime.toString}>
{UIUtils.formatDate(sqlTextData.submissionTime)}
</td>
<td>
{sqlTextData.sqlText}
</td>
</tr>
}

def toNodeSeq: Seq[Node] = {
<div>
<h4>{tableName}</h4>
{UIUtils.listingTable[SQLTextData](
header, row(_), sqlTextDatas, id = Some(tableId))}
</div>
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.util.concurrent.ConcurrentHashMap
import java.util.function.Function

import scala.collection.JavaConverters._
import scala.collection.mutable

import org.apache.spark.{JobExecutionStatus, SparkConf}
import org.apache.spark.internal.Logging
Expand All @@ -45,6 +46,7 @@ class SQLAppStatusListener(
// thread-safe.
private val liveExecutions = new ConcurrentHashMap[Long, LiveExecutionData]()
private val stageMetrics = new ConcurrentHashMap[Int, LiveStageMetrics]()
val sqlTexts = new mutable.ArrayBuffer[SQLTextData]()

// Returns true if this listener has no live data. Exposed for tests only.
private[sql] def noLiveData(): Boolean = {
Expand Down Expand Up @@ -294,10 +296,16 @@ class SQLAppStatusListener(
}
}

private def onSQLTextCaptured(event: SparkListenerSQLTextCaptured): Unit = {
val SparkListenerSQLTextCaptured(submissionTime, sqlText) = event
sqlTexts += new SQLTextData(submissionTime, sqlText)
}

override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
case e: SparkListenerSQLExecutionStart => onExecutionStart(e)
case e: SparkListenerSQLExecutionEnd => onExecutionEnd(e)
case e: SparkListenerDriverAccumUpdates => onDriverAccumUpdates(e)
case e: SparkListenerSQLTextCaptured => onSQLTextCaptured(e)
case _ => // Ignore
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@ class SQLAppStatusStore(
def planGraph(executionId: Long): SparkPlanGraph = {
store.read(classOf[SparkPlanGraphWrapper], executionId).toSparkPlanGraph()
}

def capturedSqlTexts: Seq[SQLTextData] = {
listener match {
case Some(l) => l.sqlTexts.toSeq
case None => Seq.empty[SQLTextData]
}
}
}

class SQLExecutionUIData(
Expand Down Expand Up @@ -139,3 +146,7 @@ case class SQLPlanMetric(
name: String,
accumulatorId: Long,
metricType: String)

case class SQLTextData(
submissionTime: Long,
sqlText: String)
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ case class SparkListenerSQLExecutionStart(
case class SparkListenerSQLExecutionEnd(executionId: Long, time: Long)
extends SparkListenerEvent

@DeveloperApi
case class SparkListenerSQLTextCaptured(
submissionTime: Long,
sqlText: String) extends SparkListenerEvent

/**
* A message used to update SQL metric value for driver-side updates (which doesn't get reflected
* automatically).
Expand Down