Skip to content

Commit 2dc4ce1

Browse files
committed
address comments
1 parent 5008eb6 commit 2dc4ce1

File tree

5 files changed

+23
-14
lines changed

5 files changed

+23
-14
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ trait DataSourceScanExec extends LeafExecNode with CodegenSupport {
4747
s"Scan $relation ${tableIdentifier.map(_.unquotedString).getOrElse("")}"
4848
}
4949

50+
// Metadata that describes more details of this scan.
51+
protected def metadata: Map[String, String]
52+
5053
override def simpleString: String = {
5154
val metadataEntries = metadata.toSeq.sorted.map {
5255
case (key, value) =>
@@ -75,6 +78,7 @@ case class RowDataSourceScanExec(
7578
fullOutput: Seq[Attribute],
7679
requiredColumnsIndex: Seq[Int],
7780
filters: Set[Filter],
81+
handledFilters: Set[Filter],
7882
rdd: RDD[InternalRow],
7983
@transient relation: BaseRelation,
8084
override val tableIdentifier: Option[TableIdentifier])
@@ -125,6 +129,15 @@ case class RowDataSourceScanExec(
125129
""".stripMargin
126130
}
127131

132+
override val metadata: Map[String, String] = {
133+
val markedFilters = for (filter <- filters) yield {
134+
if (handledFilters.contains(filter)) s"*$filter" else s"$filter"
135+
}
136+
Map(
137+
"ReadSchema" -> output.toStructType.catalogString,
138+
"PushedFilters" -> markedFilters.mkString("[", ", ", "]"))
139+
}
140+
128141
// Don't care about `rdd` and `tableIdentifier` when canonicalizing.
129142
override lazy val canonicalized: SparkPlan =
130143
copy(
@@ -248,7 +261,6 @@ case class FileSourceScanExec(
248261
private val pushedDownFilters = dataFilters.flatMap(DataSourceStrategy.translateFilter)
249262
logInfo(s"Pushed Filters: ${pushedDownFilters.mkString(",")}")
250263

251-
// These metadata values make scan plans uniquely identifiable for equality checking.
252264
override val metadata: Map[String, String] = {
253265
def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]")
254266
val location = relation.location

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -71,11 +71,6 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
7171
super.makeCopy(newArgs)
7272
}
7373

74-
/**
75-
* @return Metadata that describes more details of this SparkPlan.
76-
*/
77-
def metadata: Map[String, String] = Map.empty
78-
7974
/**
8075
* @return All metrics containing metrics of this SparkPlan.
8176
*/

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ class SparkPlanInfo(
3131
val nodeName: String,
3232
val simpleString: String,
3333
val children: Seq[SparkPlanInfo],
34-
val metadata: Map[String, String],
3534
val metrics: Seq[SQLMetricInfo]) {
3635

3736
override def hashCode(): Int = {
@@ -58,7 +57,6 @@ private[execution] object SparkPlanInfo {
5857
new SQLMetricInfo(metric.name.getOrElse(key), metric.id, metric.metricType)
5958
}
6059

61-
new SparkPlanInfo(plan.nodeName, plan.simpleString, children.map(fromSparkPlan),
62-
plan.metadata, metrics)
60+
new SparkPlanInfo(plan.nodeName, plan.simpleString, children.map(fromSparkPlan), metrics)
6361
}
6462
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,7 @@ case class DataSourceStrategy(conf: SQLConf) extends Strategy with Logging with
288288
l.output,
289289
l.output.indices,
290290
Set.empty,
291+
Set.empty,
291292
toCatalystRDD(l, baseRelation.buildScan()),
292293
baseRelation,
293294
None) :: Nil
@@ -372,8 +373,10 @@ case class DataSourceStrategy(conf: SQLConf) extends Strategy with Logging with
372373
relation.output,
373374
requestedColumns.map(relation.output.indexOf),
374375
pushedFilters.toSet,
376+
handledFilters,
375377
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
376-
relation.relation, relation.catalogTable.map(_.identifier))
378+
relation.relation,
379+
relation.catalogTable.map(_.identifier))
377380
filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan)
378381
} else {
379382
// A set of column attributes that are only referenced by pushed down filters. We can
@@ -392,8 +395,10 @@ case class DataSourceStrategy(conf: SQLConf) extends Strategy with Logging with
392395
relation.output,
393396
requestedColumns.map(relation.output.indexOf),
394397
pushedFilters.toSet,
398+
handledFilters,
395399
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
396-
relation.relation, relation.catalogTable.map(_.identifier))
400+
relation.relation,
401+
relation.catalogTable.map(_.identifier))
397402
execution.ProjectExec(
398403
projects, filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan))
399404
}

sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ object SparkPlanGraph {
113113
}
114114
val node = new SparkPlanGraphNode(
115115
nodeIdGenerator.getAndIncrement(), planInfo.nodeName,
116-
planInfo.simpleString, planInfo.metadata, metrics)
116+
planInfo.simpleString, metrics)
117117
if (subgraph == null) {
118118
nodes += node
119119
} else {
@@ -143,7 +143,6 @@ private[ui] class SparkPlanGraphNode(
143143
val id: Long,
144144
val name: String,
145145
val desc: String,
146-
val metadata: Map[String, String],
147146
val metrics: Seq[SQLPlanMetric]) {
148147

149148
def makeDotNode(metricsValue: Map[Long, String]): String = {
@@ -177,7 +176,7 @@ private[ui] class SparkPlanGraphCluster(
177176
desc: String,
178177
val nodes: mutable.ArrayBuffer[SparkPlanGraphNode],
179178
metrics: Seq[SQLPlanMetric])
180-
extends SparkPlanGraphNode(id, name, desc, Map.empty, metrics) {
179+
extends SparkPlanGraphNode(id, name, desc, metrics) {
181180

182181
override def makeDotNode(metricsValue: Map[Long, String]): String = {
183182
val duration = metrics.filter(_.name.startsWith(WholeStageCodegenExec.PIPELINE_DURATION_METRIC))

0 commit comments

Comments
 (0)