Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,7 @@ class SparkSession private(
* @since 2.0.0
*/
def sql(sqlText: String): DataFrame = {
SQLExecution.setSqlText(sqlText)
Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,16 @@ object SQLExecution {

private val executionIdToQueryExecution = new ConcurrentHashMap[Long, QueryExecution]()

private val executionIdToSqlText = new ConcurrentHashMap[Long, String]()

def setSqlText(sqlText: String): Unit = {
executionIdToSqlText.putIfAbsent(_nextExecutionId.get(), sqlText)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the executionId used here match the current execution? IIUC, the execution id is incremented in withNewExecutionId, and the one you used here mostly refers to the previous execution, please correct me if I'm wrong.

Copy link
Contributor Author

@LantaoJin LantaoJin Mar 13, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setSqlText is invoked before withNewExecutionId. First time _nextExecutionId is 0 by default, so setSqlText store (0, x) in map. When withNewExecutionId is invoked, the code val executionId = SQLExecution.nextExecutionId increase the execution id and return the previous execution id, 0. Then val sqlText = getSqlText(executionId) will return the sql text which 0 mapped, x. Next time when setSqlText is invoked, _nextExecutionId.get() return the increased id, 1. So the new sql text store in map (1, y).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ohh, I see. Sorry I misunderstood it.

}

def getSqlText(executionId: Long): String = {
executionIdToSqlText.get(executionId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if this execution doesn't have SQL text?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It shows nothing

}

def getQueryExecution(executionId: Long): QueryExecution = {
executionIdToQueryExecution.get(executionId)
}
Expand Down Expand Up @@ -63,16 +73,17 @@ object SQLExecution {
val oldExecutionId = sc.getLocalProperty(EXECUTION_ID_KEY)
val executionId = SQLExecution.nextExecutionId
sc.setLocalProperty(EXECUTION_ID_KEY, executionId.toString)
val sqlText = getSqlText(executionId)
executionIdToQueryExecution.put(executionId, queryExecution)
try {
// sparkContext.getCallSite() would first try to pick up any call site that was previously
// set, then fall back to Utils.getCallSite(); call Utils.getCallSite() directly on
// streaming queries would give us call site like "run at <unknown>:0"
val callSite = sparkSession.sparkContext.getCallSite()

sparkSession.sparkContext.listenerBus.post(SparkListenerSQLExecutionStart(
executionId, callSite.shortForm, callSite.longForm, queryExecution.toString,
SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan), System.currentTimeMillis()))
SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan),
System.currentTimeMillis(), sqlText))
try {
body
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ class ExecutionPage(parent: SQLTab) extends WebUIPage("execution") with Logging

summary ++
planVisualization(metrics, graph) ++
showSQLText(executionUIData.sqlText) ++
physicalPlanDescription(executionUIData.physicalPlanDescription)
}.getOrElse {
<div>No information to display for query {executionId}</div>
Expand Down Expand Up @@ -120,6 +121,25 @@ class ExecutionPage(parent: SQLTab) extends WebUIPage("execution") with Logging
private def jobURL(jobId: Long): String =
"%s/jobs/job?id=%s".format(UIUtils.prependBaseUri(parent.basePath), jobId)

private def showSQLText(sqlText: String): Seq[Node] = {
<div>
<span style="cursor: pointer;" onclick="clickShowSQLText();">
<span id="sql-text-arrow" class="arrow-closed"></span>
<a>SQL text</a>
</span>
</div>
<div id="sql-text-details" style="display: none;">
<pre>{sqlText}</pre>
</div>
<script>
function clickShowSQLText() {{
$('#sql-text-details').toggle();
$('#sql-text-arrow').toggleClass('arrow-open').toggleClass('arrow-closed');
}}
</script>
<br/>
}

private def physicalPlanDescription(physicalPlanDescription: String): Seq[Node] = {
<div>
<span style="cursor: pointer;" onclick="clickPhysicalPlanDetails();">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ class SQLAppStatusListener(

private def onExecutionStart(event: SparkListenerSQLExecutionStart): Unit = {
val SparkListenerSQLExecutionStart(executionId, description, details,
physicalPlanDescription, sparkPlanInfo, time) = event
physicalPlanDescription, sparkPlanInfo, time, sqlText) = event

def toStoredNodes(nodes: Seq[SparkPlanGraphNode]): Seq[SparkPlanGraphNodeWrapper] = {
nodes.map {
Expand Down Expand Up @@ -265,6 +265,7 @@ class SQLAppStatusListener(
exec.physicalPlanDescription = physicalPlanDescription
exec.metrics = sqlPlanMetrics
exec.submissionTime = time
exec.sqlText = sqlText
update(exec)
}

Expand Down Expand Up @@ -351,6 +352,7 @@ private class LiveExecutionData(val executionId: Long) extends LiveEntity {
var jobs = Map[Int, JobExecutionStatus]()
var stages = Set[Int]()
var driverAccumUpdates = Map[Long, Long]()
var sqlText: String = null

@volatile var metricsValues: Map[Long, String] = null

Expand All @@ -369,7 +371,8 @@ private class LiveExecutionData(val executionId: Long) extends LiveEntity {
completionTime,
jobs,
stages,
metricsValues)
metricsValues,
sqlText)
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ class SQLExecutionUIData(
* from the SQL listener instance.
*/
@JsonDeserialize(keyAs = classOf[JLong])
val metricValues: Map[Long, String]) {
val metricValues: Map[Long, String],
val sqlText: String) {

@JsonIgnore @KVIndex("completionTime")
private def completionTimeIndex: Long = completionTime.map(_.getTime).getOrElse(-1L)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ case class SparkListenerSQLExecutionStart(
details: String,
physicalPlanDescription: String,
sparkPlanInfo: SparkPlanInfo,
time: Long)
time: Long,
sqlText: String)
extends SparkListenerEvent

@DeveloperApi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,13 @@ class SQLJsonProtocolSuite extends SparkFunSuite {
| "metadata":{},
| "metrics":[]
| },
| "time":0
| "time":0,
| "sqlText":"select 1 as a"
|}
""".stripMargin
val reconstructedEvent = JsonProtocol.sparkEventFromJson(parse(SQLExecutionStartJsonString))
val expectedEvent = SparkListenerSQLExecutionStart(0, "test desc", "test detail", "test plan",
new SparkPlanInfo("TestNode", "test string", Nil, Nil), 0)
new SparkPlanInfo("TestNode", "test string", Nil, Nil), 0, "select 1 as a")
assert(reconstructedEvent == expectedEvent)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ class SQLAppStatusListenerSuite extends SparkFunSuite with SharedSQLContext with
"test",
df.queryExecution.toString,
SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
System.currentTimeMillis()))
System.currentTimeMillis(),
"select 1 as a"))

listener.onJobStart(SparkListenerJobStart(
jobId = 0,
Expand Down Expand Up @@ -298,7 +299,8 @@ class SQLAppStatusListenerSuite extends SparkFunSuite with SharedSQLContext with
"test",
df.queryExecution.toString,
SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
System.currentTimeMillis()))
System.currentTimeMillis(),
"select 1 as a"))
listener.onJobStart(SparkListenerJobStart(
jobId = 0,
time = System.currentTimeMillis(),
Expand Down Expand Up @@ -327,7 +329,8 @@ class SQLAppStatusListenerSuite extends SparkFunSuite with SharedSQLContext with
"test",
df.queryExecution.toString,
SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
System.currentTimeMillis()))
System.currentTimeMillis(),
"select 1 as a"))
listener.onJobStart(SparkListenerJobStart(
jobId = 0,
time = System.currentTimeMillis(),
Expand Down Expand Up @@ -367,7 +370,8 @@ class SQLAppStatusListenerSuite extends SparkFunSuite with SharedSQLContext with
"test",
df.queryExecution.toString,
SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
System.currentTimeMillis()))
System.currentTimeMillis(),
"select 1 as a"))
listener.onJobStart(SparkListenerJobStart(
jobId = 0,
time = System.currentTimeMillis(),
Expand Down Expand Up @@ -396,7 +400,8 @@ class SQLAppStatusListenerSuite extends SparkFunSuite with SharedSQLContext with
"test",
df.queryExecution.toString,
SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
System.currentTimeMillis()))
System.currentTimeMillis(),
"select 1 as a"))

var stageId = 0
def twoStageJob(jobId: Int): Unit = {
Expand Down Expand Up @@ -521,21 +526,24 @@ class SQLAppStatusListenerSuite extends SparkFunSuite with SharedSQLContext with
val df = createTestDataFrame
// Start execution 1 and execution 2
time += 1
val sqlText = "select 1 as a"
listener.onOtherEvent(SparkListenerSQLExecutionStart(
1,
"test",
"test",
df.queryExecution.toString,
SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
time))
time,
sqlText))
time += 1
listener.onOtherEvent(SparkListenerSQLExecutionStart(
2,
"test",
"test",
df.queryExecution.toString,
SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
time))
time,
sqlText))

// Stop execution 2 before execution 1
time += 1
Expand All @@ -551,7 +559,8 @@ class SQLAppStatusListenerSuite extends SparkFunSuite with SharedSQLContext with
"test",
df.queryExecution.toString,
SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
time))
time,
sqlText))
assert(statusStore.executionsCount === 2)
assert(statusStore.execution(2) === None)
}
Expand Down