Skip to content

Commit 97b0ece

Browse files
fenzhuGitHub Enterprise
authored andcommitted
[CARMEL-6152] [Follow-up] Calculate more accurate information in the view (#1055)
1 parent 156d8d9 commit 97b0ece

File tree

1 file changed

+29
-12
lines changed

1 file changed

+29
-12
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeViewCommand.scala

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,12 @@ import org.apache.spark.SparkException
2424
import org.apache.spark.sql.{Row, SparkSession}
2525
import org.apache.spark.sql.catalyst.TableIdentifier
2626
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
27-
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, DynamicPruningExpression, Expression, SubqueryExpression}
28-
import org.apache.spark.sql.execution.{FileSourceScanExec, InSubqueryExec, SparkPlan, SubqueryBroadcastExec, WindowSortLimitExec}
29-
import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, SortAggregateExec}
30-
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, BroadcastRangeExchangeExec, ShuffleExchangeExec}
31-
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec}
27+
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, DynamicPruningExpression, Expression}
28+
import org.apache.spark.sql.execution.{FileSourceScanExec, InputAdapter, InSubqueryExec, SparkPlan, SubqueryBroadcastExec, WholeStageCodegenExec}
29+
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
30+
import org.apache.spark.sql.execution.aggregate.BaseAggregateExec
31+
import org.apache.spark.sql.execution.exchange.Exchange
32+
import org.apache.spark.sql.execution.joins.BaseJoinExec
3233
import org.apache.spark.sql.execution.window.WindowExec
3334
import org.apache.spark.sql.types.{BooleanType, DateType, NumericType, StringType, StructField}
3435

@@ -83,8 +84,8 @@ case class AnalyzeViewCommand(view: TableIdentifier) extends RunnableCommand {
8384
private def generatePredicate(field: StructField): String = {
8485
val name = field.name
8586
field.dataType match {
86-
case _: NumericType => s"$name > 1"
87-
case _: StringType => s"$name = 'xxx'"
87+
case _: NumericType => s"$name != 1"
88+
case _: StringType => s"$name != 'xxx'"
8889
case _: BooleanType => s"$name = boolean('true')"
8990
case _: DateType => s"$name >= '2000-01-01'"
9091
case _ => s"$name is not null"
@@ -155,15 +156,31 @@ case class AnalyzeViewCommand(view: TableIdentifier) extends RunnableCommand {
155156
e
156157
case e: Expression => e
157158
}
158-
case _: ShuffleExchangeExec | _: BroadcastExchangeExec | _: BroadcastRangeExchangeExec =>
159+
totalOperator = totalOperator + 1
160+
case _: Exchange =>
159161
exchangeCount = exchangeCount + 1
160-
case _: SortMergeJoinExec | _: ShuffledHashJoinExec | _: BroadcastHashJoinExec =>
162+
totalOperator = totalOperator + 1
163+
case _: BaseJoinExec =>
161164
joinCount = joinCount + 1
162-
case _: WindowExec | _: WindowSortLimitExec =>
165+
totalOperator = totalOperator + 1
166+
case _: WindowExec =>
163167
windowCount = windowCount + 1
164-
case _: HashAggregateExec | _: SortAggregateExec =>
168+
totalOperator = totalOperator + 1
169+
case _: BaseAggregateExec =>
165170
aggCount = aggCount + 1
166-
case _ => totalOperator = totalOperator + 1
171+
totalOperator = totalOperator + 1
172+
case _: WholeStageCodegenExec | _: InputAdapter =>
173+
case aqe: AdaptiveSparkPlanExec =>
174+
val subRes = collectPartitionPredicatesAndOperators(aqe)
175+
subRes._1.foreach(t => partitionColumnPredicate.put(t._1, t._2))
176+
if (subRes._2.length == 5) {
177+
totalOperator = totalOperator + subRes._2.head._2 + 1
178+
joinCount = joinCount + subRes._2(1)._2
179+
exchangeCount = exchangeCount + subRes._2(2)._2
180+
aggCount = aggCount + subRes._2(3)._2
181+
windowCount = windowCount + subRes._2(4)._2
182+
}
183+
case _: SparkPlan => totalOperator = totalOperator + 1
167184
}
168185
val info = Seq(("Total", totalOperator), ("Join", joinCount), ("Exchange",
169186
exchangeCount), ("Aggregation", aggCount), ("Window", windowCount))

0 commit comments

Comments
 (0)