Skip to content
Closed
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.api.java.JavaRDD
import org.apache.spark.api.java.function._
import org.apache.spark.api.python.{PythonRDD, SerDeUtil}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.QueryPlanningTracker
import org.apache.spark.sql.catalyst.analysis._
Expand Down Expand Up @@ -179,7 +180,7 @@ class Dataset[T] private[sql](
@transient val sparkSession: SparkSession,
@DeveloperApi @Unstable @transient val queryExecution: QueryExecution,
encoder: Encoder[T])
extends Serializable {
extends Serializable with Logging {

queryExecution.assertAnalyzed()

Expand Down Expand Up @@ -2154,6 +2155,18 @@ class Dataset[T] private[sql](
colNames,
"in given column names",
sparkSession.sessionState.conf.caseSensitiveAnalysis)
var numProjects = 0
var currPlan = logicalPlan
while (currPlan.isInstanceOf[Project] && numProjects < 50) {
Copy link
Member

@dongjoon-hyun dongjoon-hyun Dec 17, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. If we need to warn, +1 for adding new configuration for this value instead of 50 here and line 2164.

50 looks effective to detect this pattern, but can we have a higher value which is more practically related to the warning messages(performance degradation or OOM?)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I just wanted to be sure that we agree on the idea. Do you have hint/preferences for the name of the config?
I didn't want to introduce a high value in order not to have a high impact on perf for the loop to check this. What do you think?

Copy link
Contributor

@HeartSaVioR HeartSaVioR Dec 17, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about checking the count of continuous projection and keep/reduce the count? I can't imagine end users to create more than (like) 20 times projection continuously without withColumn/drop/etc instead of select.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean @HeartSaVioR ? I don't think it is a good idea to add a counter in the Dataset class, which, moreover, should be carried over when creating a new Dataset, otherwise it is useless. It'd be an overkill for this IMO.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad. Just re-read the code (while loop) and now seeing that this implementation already considers only continuous projections. Sorry about confusion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

np, thanks for your comment

numProjects += 1
currPlan = currPlan.children.head // Since it is a Project, it has 1 and only 1 child
}
if (numProjects == 50) {
logWarning("The current plan contains many projects on the top. This happens usually when " +
"using `withColumn` in a loop. Please, avoid this pattern as it can seriously affect " +
"performance and even cause OOM due to the huge size of the generated plan. Please use " +
"a single select providing all the needed rows to it instead.")
}

val resolver = sparkSession.sessionState.analyzer.resolver
val output = queryExecution.analyzed.output
Expand Down