Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
try {
val start = System.nanoTime()
// call `QueryExecution.toRDD` to trigger the execution of commands.
SQLExecution.withNewExecutionId(session, qe)(qe.toRdd)
SQLExecution.withNewExecutionId(session, qe, ds.sqlText)(qe.toRdd)
val end = System.nanoTime()
session.listenerManager.onSuccess(name, qe, end - start)
} catch {
Expand Down
70 changes: 44 additions & 26 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,18 +61,20 @@ import org.apache.spark.unsafe.types.CalendarInterval
import org.apache.spark.util.Utils

private[sql] object Dataset {
def apply[T: Encoder](sparkSession: SparkSession, logicalPlan: LogicalPlan): Dataset[T] = {
val dataset = new Dataset(sparkSession, logicalPlan, implicitly[Encoder[T]])
def apply[T: Encoder](sparkSession: SparkSession,
logicalPlan: LogicalPlan, sqlText: String = ""): Dataset[T] = {
val dataset = new Dataset(sparkSession, logicalPlan, implicitly[Encoder[T]], sqlText)
// Eagerly bind the encoder so we verify that the encoder matches the underlying
// schema. The user will get an error if this is not the case.
dataset.deserializer
dataset
}

def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame = {
def ofRows(sparkSession: SparkSession,
logicalPlan: LogicalPlan, sqlText: String = ""): DataFrame = {
val qe = sparkSession.sessionState.executePlan(logicalPlan)
qe.assertAnalyzed()
new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema))
new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema), sqlText)
}
}

Expand Down Expand Up @@ -166,20 +168,28 @@ private[sql] object Dataset {
class Dataset[T] private[sql](
@transient val sparkSession: SparkSession,
@DeveloperApi @InterfaceStability.Unstable @transient val queryExecution: QueryExecution,
encoder: Encoder[T])
encoder: Encoder[T],
val sqlText: String = "")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the exact rule you defined to decide whether or not we should propagate the sql text?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And how does the SQL shell execute commands? like SELECT * FROM ..., does it display all the rows or add a LIMIT before displaying? Generally we should not propagate sql text, as a new DataFrame usually means the plan is changed, the SQL text is not accurate anymore.

Copy link
Contributor Author

@LantaoJin LantaoJin Mar 20, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your review. I agree this comment. Before the discuss, let me reproduce the scenario our company met. Team A developed a framework to submit application with sql sentences in a file

spark-submit --master yarn-cluster --class com.ebay.SQLFramework -s biz.sql

In the biz.sql, there are many sql sentences like

create or replace temporary view view_a select xx from table ${old_db}.table_a where dt=${check_date};
insert overwrite table ${new_db}.table_a select xx from view_a join ${new_db}.table_b;
...

There is no case like
val df = spark.sql("xxxxx")
spark.range(10).collect()
df.filter(..).count()

Team B (Platform) need to capture the really sql sentences which are executed in whole cluster, as the sql files from Team A contains many variables. A better way is recording the really sql sentence in EventLog.

Ok, back to the discussion. The original purpose is to display the sql sentence which user inputs. spark.range(10).collect() isn't a sql sentence user inputs, either df.filter(..).count() . Only "xxxxx" is. So I have two proposals and a further think.

  1. Change the display behavior, only displays the sql which can trigger action. like "create table", "insert overwrite", etc. Do not care about the select sentence. That won't propagate sql text any more. The test case above won't show anything in SQL ui. Also, the ui will show "Sql text which triggers this execution" instead of "Sql text"
  2. Add a SQLCommandEvent and post an event with sql sentence in method SparkSession.sql(), then in the EventLoggingListener, just logging this to eventlog. I am not sure in this way, we still can get the sql text in ui.

Further more, what about open another ticket to add a command option --sqlfile biz.sql in spark-submit command. biz.sql must be a file consist by sql sentence. Base this implementation, not only client mode but also cluster mode can use pure sql.

How do you think? @cloud-fan

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark-submit --master yarn-cluster --class com.ebay.SQLFramework -s biz.sql

How does com.ebay.SQLFramework process the sql file? just call spark.sql(xxxx).show or other stuff?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your speculation is almost right. First call val df = spark.sql(), then separates the sql text with pattern matching to there type: count, limit and other. if count, then invoke the df.showString(2,20). if limit, just invoke df.limit(1).foreach, the last type other will do noting.

extends Serializable {

queryExecution.assertAnalyzed()

// Note for Spark contributors: if adding or updating any action in `Dataset`, please make sure
// you wrap it with `withNewExecutionId` if this actions doesn't call other action.

def this(sparkSession: SparkSession, logicalPlan: LogicalPlan, encoder: Encoder[T]) = {
this(sparkSession, sparkSession.sessionState.executePlan(logicalPlan), encoder)
def this(sparkSession: SparkSession, logicalPlan: LogicalPlan,
encoder: Encoder[T]) = {
this(sparkSession, sparkSession.sessionState.executePlan(logicalPlan), encoder, "")
}

def this(sqlContext: SQLContext, logicalPlan: LogicalPlan, encoder: Encoder[T]) = {
this(sqlContext.sparkSession, logicalPlan, encoder)
def this(sparkSession: SparkSession, logicalPlan: LogicalPlan,
encoder: Encoder[T], sqlText: String) = {
this(sparkSession, sparkSession.sessionState.executePlan(logicalPlan), encoder, sqlText)
}

def this(sqlContext: SQLContext, logicalPlan: LogicalPlan,
encoder: Encoder[T], sqlText: String) = {
this(sqlContext.sparkSession, logicalPlan, encoder, sqlText)
}

@transient private[sql] val logicalPlan: LogicalPlan = {
Expand Down Expand Up @@ -390,7 +400,8 @@ class Dataset[T] private[sql](
*/
// This is declared with parentheses to prevent the Scala compiler from treating
// `ds.toDF("1")` as invoking this toDF and then apply on the returned DataFrame.
def toDF(): DataFrame = new Dataset[Row](sparkSession, queryExecution, RowEncoder(schema))
def toDF(): DataFrame =
new Dataset[Row](sparkSession, queryExecution, RowEncoder(schema), sqlText)

/**
* :: Experimental ::
Expand Down Expand Up @@ -622,7 +633,8 @@ class Dataset[T] private[sql](
outputPartitioning,
physicalPlan.outputOrdering,
isStreaming
)(sparkSession)).as[T]
)(sparkSession),
sqlText).as[T]
}

/**
Expand Down Expand Up @@ -1364,10 +1376,11 @@ class Dataset[T] private[sql](
planWithBarrier)

if (encoder.flat) {
new Dataset[U1](sparkSession, project, encoder)
new Dataset[U1](sparkSession, project, encoder, sqlText)
} else {
// Flattens inner fields of U1
new Dataset[Tuple1[U1]](sparkSession, project, ExpressionEncoder.tuple(encoder)).map(_._1)
new Dataset[Tuple1[U1]](sparkSession, project, ExpressionEncoder.tuple(encoder), sqlText)
.map(_._1)
}
}

Expand All @@ -1381,7 +1394,7 @@ class Dataset[T] private[sql](
val namedColumns =
columns.map(_.withInputType(exprEnc, planWithBarrier.output).named)
val execution = new QueryExecution(sparkSession, Project(namedColumns, planWithBarrier))
new Dataset(sparkSession, execution, ExpressionEncoder.tuple(encoders))
new Dataset(sparkSession, execution, ExpressionEncoder.tuple(encoders), sqlText)
}

/**
Expand Down Expand Up @@ -2023,7 +2036,7 @@ class Dataset[T] private[sql](
val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _)
normalizedCumWeights.sliding(2).map { x =>
new Dataset[T](
sparkSession, Sample(x(0), x(1), withReplacement = false, seed, plan), encoder)
sparkSession, Sample(x(0), x(1), withReplacement = false, seed, plan), encoder, sqlText)
}.toArray
}

Expand Down Expand Up @@ -2583,7 +2596,7 @@ class Dataset[T] private[sql](
new Dataset[U](
sparkSession,
MapPartitions[T, U](func, planWithBarrier),
implicitly[Encoder[U]])
implicitly[Encoder[U]], sqlText)
}

/**
Expand Down Expand Up @@ -2613,7 +2626,8 @@ class Dataset[T] private[sql](
val rowEncoder = encoder.asInstanceOf[ExpressionEncoder[Row]]
Dataset.ofRows(
sparkSession,
MapPartitionsInR(func, packageNames, broadcastVars, schema, rowEncoder, planWithBarrier))
MapPartitionsInR(func, packageNames, broadcastVars, schema, rowEncoder, planWithBarrier),
sqlText)
}

/**
Expand Down Expand Up @@ -2766,7 +2780,7 @@ class Dataset[T] private[sql](
* @group action
* @since 1.6.0
*/
def count(): Long = withAction("count", groupBy().count().queryExecution) { plan =>
def count(): Long = withAction("count", groupBy().count().queryExecution, true) { plan =>
plan.executeCollect().head.getLong(0)
}

Expand Down Expand Up @@ -3222,7 +3236,7 @@ class Dataset[T] private[sql](
* an execution.
*/
private def withNewExecutionId[U](body: => U): U = {
SQLExecution.withNewExecutionId(sparkSession, queryExecution)(body)
SQLExecution.withNewExecutionId(sparkSession, queryExecution, sqlText)(body)
}

/**
Expand All @@ -3231,7 +3245,7 @@ class Dataset[T] private[sql](
* reset.
*/
private def withNewRDDExecutionId[U](body: => U): U = {
SQLExecution.withNewExecutionId(sparkSession, rddQueryExecution) {
SQLExecution.withNewExecutionId(sparkSession, rddQueryExecution, sqlText) {
rddQueryExecution.executedPlan.foreach { plan =>
plan.resetMetrics()
}
Expand All @@ -3243,13 +3257,17 @@ class Dataset[T] private[sql](
* Wrap a Dataset action to track the QueryExecution and time cost, then report to the
* user-registered callback functions.
*/
private def withAction[U](name: String, qe: QueryExecution)(action: SparkPlan => U) = {
private def withAction[U](
name: String,
qe: QueryExecution,
hideSqlText: Boolean = false)(action: SparkPlan => U) = {
try {
qe.executedPlan.foreach { plan =>
plan.resetMetrics()
}
val start = System.nanoTime()
val result = SQLExecution.withNewExecutionId(sparkSession, qe) {
val result = SQLExecution.withNewExecutionId(sparkSession, qe,
if (hideSqlText) "" else sqlText) {
action(qe.executedPlan)
}
val end = System.nanoTime()
Expand Down Expand Up @@ -3292,21 +3310,21 @@ class Dataset[T] private[sql](

/** A convenient function to wrap a logical plan and produce a DataFrame. */
@inline private def withPlan(logicalPlan: LogicalPlan): DataFrame = {
Dataset.ofRows(sparkSession, logicalPlan)
Dataset.ofRows(sparkSession, logicalPlan, sqlText)
}

/** A convenient function to wrap a logical plan and produce a Dataset. */
@inline private def withTypedPlan[U : Encoder](logicalPlan: LogicalPlan): Dataset[U] = {
Dataset(sparkSession, logicalPlan)
Dataset(sparkSession, logicalPlan, sqlText)
}

/** A convenient function to wrap a set based logical plan and produce a Dataset. */
@inline private def withSetOperator[U : Encoder](logicalPlan: LogicalPlan): Dataset[U] = {
if (classTag.runtimeClass.isAssignableFrom(classOf[Row])) {
// Set operators widen types (change the schema), so we cannot reuse the row encoder.
Dataset.ofRows(sparkSession, logicalPlan).asInstanceOf[Dataset[U]]
Dataset.ofRows(sparkSession, logicalPlan, sqlText).asInstanceOf[Dataset[U]]
} else {
Dataset(sparkSession, logicalPlan)
Dataset(sparkSession, logicalPlan, sqlText)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ class SparkSession private(
}
}

lazy private val substitutor = new VariableSubstitution(sessionState.conf)

/**
* A wrapped version of this session in the form of a [[SQLContext]], for backward compatibility.
*
Expand Down Expand Up @@ -635,7 +637,8 @@ class SparkSession private(
* @since 2.0.0
*/
def sql(sqlText: String): DataFrame = {
Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText))
Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText),
substitutor.substitute(sqlText))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @LantaoJin .
What you need is just grapping the initial SQL text here, you can use Spark extension. Please refer Spark Atlas Connector for a sample code.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may want to refactor this PR into ParserExtension and UI part. I think that will be less intrusive than the current implementation.

Copy link
Member

@dongjoon-hyun dongjoon-hyun Mar 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, in general, the initial SQL texts easily become meaningless when another operations are added. In your example, the following case shows a misleading and wrong SQL statement instead of real executed SQL plan.

val df = spark.sql("xxxxx")
df.filter(...).collect() // shows sql text "xxxxx"

As another example, please try the following. It will show you select a,b from t1.

scala> spark.sql("select a,b from t1").select("a").show
+---+
|  a|
+---+
|  1|
+---+

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the following case shows a misleading and wrong SQL statement instead of real executed SQL plan.

Yes. We know this, so current implementation which bind sql text to DF is not good.

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ object SQLExecution {
*/
def withNewExecutionId[T](
sparkSession: SparkSession,
queryExecution: QueryExecution)(body: => T): T = {
queryExecution: QueryExecution,
sqlText: String = "")(body: => T): T = {
val sc = sparkSession.sparkContext
val oldExecutionId = sc.getLocalProperty(EXECUTION_ID_KEY)
val executionId = SQLExecution.nextExecutionId
Expand All @@ -69,10 +70,10 @@ object SQLExecution {
// set, then fall back to Utils.getCallSite(); call Utils.getCallSite() directly on
// streaming queries would give us call site like "run at <unknown>:0"
val callSite = sparkSession.sparkContext.getCallSite()

sparkSession.sparkContext.listenerBus.post(SparkListenerSQLExecutionStart(
executionId, callSite.shortForm, callSite.longForm, queryExecution.toString,
SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan), System.currentTimeMillis()))
SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan),
System.currentTimeMillis(), sqlText))
try {
body
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ class MicroBatchExecution(
new Dataset(sparkSessionToRunBatch, lastExecution, RowEncoder(lastExecution.analyzed.schema))

reportTimeTaken("addBatch") {
SQLExecution.withNewExecutionId(sparkSessionToRunBatch, lastExecution) {
SQLExecution.withNewExecutionId(sparkSessionToRunBatch, lastExecution, nextBatch.sqlText) {
sink match {
case s: Sink => s.addBatch(currentBatchId, nextBatch)
case _: StreamWriteSupport =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ class ExecutionPage(parent: SQLTab) extends WebUIPage("execution") with Logging

summary ++
planVisualization(metrics, graph) ++
showSQLText(executionUIData.sqlText) ++
physicalPlanDescription(executionUIData.physicalPlanDescription)
}.getOrElse {
<div>No information to display for query {executionId}</div>
Expand Down Expand Up @@ -120,6 +121,25 @@ class ExecutionPage(parent: SQLTab) extends WebUIPage("execution") with Logging
private def jobURL(jobId: Long): String =
"%s/jobs/job?id=%s".format(UIUtils.prependBaseUri(parent.basePath), jobId)

private def showSQLText(sqlText: String): Seq[Node] = {
<div>
<span style="cursor: pointer;" onclick="clickShowSQLText();">
<span id="sql-text-arrow" class="arrow-closed"></span>
<a>SQL text</a>
</span>
</div>
<div id="sql-text-details" style="display: none;">
<pre>{sqlText}</pre>
</div>
<script>
function clickShowSQLText() {{
$('#sql-text-details').toggle();
$('#sql-text-arrow').toggleClass('arrow-open').toggleClass('arrow-closed');
}}
</script>
<br/>
}

private def physicalPlanDescription(physicalPlanDescription: String): Seq[Node] = {
<div>
<span style="cursor: pointer;" onclick="clickPhysicalPlanDetails();">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ class SQLAppStatusListener(

private def onExecutionStart(event: SparkListenerSQLExecutionStart): Unit = {
val SparkListenerSQLExecutionStart(executionId, description, details,
physicalPlanDescription, sparkPlanInfo, time) = event
physicalPlanDescription, sparkPlanInfo, time, sqlText) = event

def toStoredNodes(nodes: Seq[SparkPlanGraphNode]): Seq[SparkPlanGraphNodeWrapper] = {
nodes.map {
Expand Down Expand Up @@ -265,6 +265,7 @@ class SQLAppStatusListener(
exec.physicalPlanDescription = physicalPlanDescription
exec.metrics = sqlPlanMetrics
exec.submissionTime = time
exec.sqlText = sqlText
update(exec)
}

Expand Down Expand Up @@ -351,6 +352,7 @@ private class LiveExecutionData(val executionId: Long) extends LiveEntity {
var jobs = Map[Int, JobExecutionStatus]()
var stages = Set[Int]()
var driverAccumUpdates = Map[Long, Long]()
var sqlText: String = null

@volatile var metricsValues: Map[Long, String] = null

Expand All @@ -369,7 +371,8 @@ private class LiveExecutionData(val executionId: Long) extends LiveEntity {
completionTime,
jobs,
stages,
metricsValues)
metricsValues,
sqlText)
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ class SQLExecutionUIData(
* from the SQL listener instance.
*/
@JsonDeserialize(keyAs = classOf[JLong])
val metricValues: Map[Long, String]) {
val metricValues: Map[Long, String],
val sqlText: String) {

@JsonIgnore @KVIndex("completionTime")
private def completionTimeIndex: Long = completionTime.map(_.getTime).getOrElse(-1L)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ case class SparkListenerSQLExecutionStart(
details: String,
physicalPlanDescription: String,
sparkPlanInfo: SparkPlanInfo,
time: Long)
time: Long,
sqlText: String)
extends SparkListenerEvent

@DeveloperApi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,13 @@ class SQLJsonProtocolSuite extends SparkFunSuite {
| "metadata":{},
| "metrics":[]
| },
| "time":0
| "time":0,
| "sqlText":"select 1 as a"
|}
""".stripMargin
val reconstructedEvent = JsonProtocol.sparkEventFromJson(parse(SQLExecutionStartJsonString))
val expectedEvent = SparkListenerSQLExecutionStart(0, "test desc", "test detail", "test plan",
new SparkPlanInfo("TestNode", "test string", Nil, Nil), 0)
new SparkPlanInfo("TestNode", "test string", Nil, Nil), 0, "select 1 as a")
assert(reconstructedEvent == expectedEvent)
}
}
Loading