Skip to content

Conversation

@LantaoJin
Copy link
Contributor

@LantaoJin LantaoJin commented Mar 12, 2018

What changes were proposed in this pull request?

SPARK-4871 had already added the sql statement in job description for using spark-sql. But it has some problems:

  1. long sql statement cannot be displayed in description column.
    screen shot 2018-03-12 at 14 25 51

  2. sql statement submitted in spark-shell or spark-submit cannot be covered.

In eBay, most spark applications like ETL using spark-submit to schedule their jobs with a few sql files. The sql statement in those applications cannot be saw in current spark UI. Even we get the sql files, there are many variables in the it such as "select * from ${workingBD}.table where data_col=${TODAY}". So this

screen shot 2018-03-12 at 20 16 23

How was this patch tested?

screen shot 2018-03-12 at 20 16 14

@LantaoJin
Copy link
Contributor Author

@gatorsmile @cloud-fan Could you add some comments?

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@cloud-fan
Copy link
Contributor

what if an SQL execution triggers multiple jobs?

@wangyum
Copy link
Member

wangyum commented Mar 13, 2018

  1. Double click this SQL statement can show full SQL statement: [SPARK-8145][WebUI]Trigger a double click on the span to show full job description. #6646
  2. What if this SQL statement contains --hiveconf or --hivevar?

@LantaoJin
Copy link
Contributor Author

@cloud-fan one SQL execution only has one sql statement whatever how many jobs it triggered.

@LantaoJin
Copy link
Contributor Author

LantaoJin commented Mar 13, 2018

What if this SQL statement contains --hiveconf or --hivevar?

What's meaning? Can you give an example?

@wangyum
Copy link
Member

wangyum commented Mar 13, 2018

cat <<EOF > test.sql
select '\${a}', '\${b}';
EOF

spark-sql --hiveconf a=avalue --hivevar b=bvalue -f test.sql

SQL text is select ${a}, ${b} or select avalue, bvalue?

private val executionIdToSqlText = new ConcurrentHashMap[Long, String]()

def setSqlText(sqlText: String): Unit = {
executionIdToSqlText.putIfAbsent(_nextExecutionId.get(), sqlText)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the executionId used here match the current execution? IIUC, the execution id is incremented in withNewExecutionId, and the one you used here mostly refers to the previous execution, please correct me if I'm wrong.

Copy link
Contributor Author

@LantaoJin LantaoJin Mar 13, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setSqlText is invoked before withNewExecutionId. First time _nextExecutionId is 0 by default, so setSqlText store (0, x) in map. When withNewExecutionId is invoked, the code val executionId = SQLExecution.nextExecutionId increase the execution id and return the previous execution id, 0. Then val sqlText = getSqlText(executionId) will return the sql text which 0 mapped, x. Next time when setSqlText is invoked, _nextExecutionId.get() return the increased id, 1. So the new sql text store in map (1, y).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ohh, I see. Sorry I misunderstood it.

@LantaoJin
Copy link
Contributor Author

@wangyum Good point. Unfortunately it is select ${a}, ${b}. Let me fix it.

@cloud-fan
Copy link
Contributor

So this patch duplicates the SQL text info on the jobs page to the SQL query page. I think it's good and more user-friendly, but we need to make sure the underlying implementation reuse the code, to avoid problems like missing the --hivevar.

@LantaoJin
Copy link
Contributor Author

LantaoJin commented Mar 14, 2018

Thanks a lot, @cloud-fan . The problems like missing the --hivevar also exist in current implementation (display sql text in jobs pages). I will try to fix it in my ticket. Probably accurately, this patch not only moves the sql text from jobs page to sql query page, but also resolves the problem that sql text cannot be captured from bin/spark-submit or bin/spark-shell. You know bin/spark-sql (client deploy mode) is mostly used in ad-hoc scenario. Besides that, lots of Spark SQL scenarios like daily job in warehouse, ETL job, and others which need to be submitted to cluster, SPARK-4871 doesn't cover.

@LantaoJin
Copy link
Contributor Author

Hi @wangyum, the problem about variable substitution now is resolved.

}

def getSqlText(executionId: Long): String = {
executionIdToSqlText.get(executionId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if this execution doesn't have SQL text?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It shows nothing

* @since 2.0.0
*/
def sql(sqlText: String): DataFrame = {
SQLExecution.setSqlText(substitutor.substitute(sqlText))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the most difficult part is, how to connect the SQL text to the execution. I don't think the current one works, e.g.

val df = spark.sql("xxxxx")
spark.range(10).count()

You set the SQL text for the next execution, but the next execution may not happen on this dataframe.

I think SQL text should belong to a DataFrame, and executions on this dataframe show the SQL text. e.g.

val df = spark.sql("xxxxxx")
df.collect() // this should show sql text on the UI
df.count() // shall we shall sql text?
df.show() // this adds a limit on top of the query plan, but ideally we should shall the sql text.
df.filter(...).collect() // how about this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan, Bind sql text to DataFrame is a good idea. Trying to fix the list you mentioned above.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to answer the list first. Strictly speaking, except collect, most of the dataframe operations will create another dataframe and execute. e.g. .count() creates a new dataframe with aggregate, .show() creates a new dataframe with limit.

It seems like df.count should not show the SQL, but df.show should as it's very common.

@LantaoJin
Copy link
Contributor Author

@cloud-fan, please review.
The test result is:
val df = spark.sql("xxxxx")
spark.range(10).count() // noting show in UI
df.collect() // show sql text "xxxxx" on the UI
df.count() // show sql text "xxxxx" on the UI
df.show() // show sql text "xxxxx" on the UI
df.filter(...).collect() // show sql text "xxxxx" on the UI

@LantaoJin
Copy link
Contributor Author

@cloud-fan @jerryshao In the last commit, seems I faced a Scala bug. :-(

[error] /Users/lajin/git/my/spark/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:63: in object Dataset, multiple overloaded alternatives of define default arguments
[error] Error occurred in an application involving default arguments.
[error] private[sql] object Dataset {
[error] ^

https://stackoverflow.com/questions/24991209/scala-2-11-complains-with-multiple-overloaded-alternatives-of-method

@cloud-fan
Copy link
Contributor

Sorry I didn't clarify it clearly enough. I was not suggesting to show sql text for all of these cases, but tried to raise a discussion about when we should show sql text. e.g. for df.count() and df.filter(...).count seems we should not show.

@LantaoJin
Copy link
Contributor Author

LantaoJin commented Mar 19, 2018

@cloud-fan, please review.
Now the test result is:
val df = spark.sql("xxxxx")
spark.range(10).collect() // noting shows in UI
df.collect() // shows sql text "xxxxx"
df.count() // noting show in UI
df.show() // shows sql text "xxxxx"
df.filter(...).collect() // shows sql text "xxxxx"
df.filter(...).count() // noting shows

@DeveloperApi @InterfaceStability.Unstable @transient val queryExecution: QueryExecution,
encoder: Encoder[T])
encoder: Encoder[T],
val sqlText: String = "")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the exact rule you defined to decide whether or not we should propagate the sql text?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And how does the SQL shell execute commands? like SELECT * FROM ..., does it display all the rows or add a LIMIT before displaying? Generally we should not propagate sql text, as a new DataFrame usually means the plan is changed, the SQL text is not accurate anymore.

Copy link
Contributor Author

@LantaoJin LantaoJin Mar 20, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your review. I agree this comment. Before the discuss, let me reproduce the scenario our company met. Team A developed a framework to submit application with sql sentences in a file

spark-submit --master yarn-cluster --class com.ebay.SQLFramework -s biz.sql

In the biz.sql, there are many sql sentences like

create or replace temporary view view_a select xx from table ${old_db}.table_a where dt=${check_date};
insert overwrite table ${new_db}.table_a select xx from view_a join ${new_db}.table_b;
...

There is no case like
val df = spark.sql("xxxxx")
spark.range(10).collect()
df.filter(..).count()

Team B (Platform) need to capture the really sql sentences which are executed in whole cluster, as the sql files from Team A contains many variables. A better way is recording the really sql sentence in EventLog.

Ok, back to the discussion. The original purpose is to display the sql sentence which user inputs. spark.range(10).collect() isn't a sql sentence user inputs, either df.filter(..).count() . Only "xxxxx" is. So I have two proposals and a further think.

  1. Change the display behavior, only displays the sql which can trigger action. like "create table", "insert overwrite", etc. Do not care about the select sentence. That won't propagate sql text any more. The test case above won't show anything in SQL ui. Also, the ui will show "Sql text which triggers this execution" instead of "Sql text"
  2. Add a SQLCommandEvent and post an event with sql sentence in method SparkSession.sql(), then in the EventLoggingListener, just logging this to eventlog. I am not sure in this way, we still can get the sql text in ui.

Further more, what about open another ticket to add a command option --sqlfile biz.sql in spark-submit command. biz.sql must be a file consist by sql sentence. Base this implementation, not only client mode but also cluster mode can use pure sql.

How do you think? @cloud-fan

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark-submit --master yarn-cluster --class com.ebay.SQLFramework -s biz.sql

How does com.ebay.SQLFramework process the sql file? just call spark.sql(xxxx).show or other stuff?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your speculation is almost right. First call val df = spark.sql(), then separates the sql text with pattern matching to there type: count, limit and other. if count, then invoke the df.showString(2,20). if limit, just invoke df.limit(1).foreach, the last type other will do noting.

@LantaoJin
Copy link
Contributor Author

LantaoJin commented Mar 21, 2018

I have decoupled the sqlText with sql execution. In current implementation, when user invoke spark.sql(xx), it will create a new SparkListenerSQLTextCaptured event to listenerbus. Then in SQLAppStatusListener, the information will be stored and all the sql sentences will display in AllExecutionPage in order with submission time, instead of in each ExecutionPage. I will upload the commit after testing. (Better to create a new PR?)

@LantaoJin
Copy link
Contributor Author

screen shot 2018-03-21 at 23 22 07

def sql(sqlText: String): DataFrame = {
Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText))
Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText),
substitutor.substitute(sqlText))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @LantaoJin .
What you need is just grapping the initial SQL text here, you can use Spark extension. Please refer Spark Atlas Connector for a sample code.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may want to refactor this PR into ParserExtension and UI part. I think that will be less intrusive than the current implementation.

Copy link
Member

@dongjoon-hyun dongjoon-hyun Mar 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, in general, the initial SQL texts easily become meaningless when another operations are added. In your example, the following case shows a misleading and wrong SQL statement instead of real executed SQL plan.

val df = spark.sql("xxxxx")
df.filter(...).collect() // shows sql text "xxxxx"

As another example, please try the following. It will show you select a,b from t1.

scala> spark.sql("select a,b from t1").select("a").show
+---+
|  a|
+---+
|  1|
+---+

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the following case shows a misleading and wrong SQL statement instead of real executed SQL plan.

Yes. We know this, so current implementation which bind sql text to DF is not good.

@LantaoJin
Copy link
Contributor Author

Hi @jerryshao @cloud-fan @dongjoon-hyun, I would like to close this PR and open another one #20876, would you please move to that?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants