Skip to content

Commit 14003d4

Browse files
huaxingaocloud-fan
authored andcommitted
[SPARK-32590][SQL] Remove fullOutput from RowDataSourceScanExec
### What changes were proposed in this pull request? Remove `fullOutput` from `RowDataSourceScanExec` ### Why are the changes needed? `RowDataSourceScanExec` requires the full output instead of the scan output after column pruning. However, in v2 code path, we don't have the full output anymore so we just pass the pruned output. `RowDataSourceScanExec.fullOutput` is actually meaningless so we should remove it. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? existing tests Closes #29415 from huaxingao/rm_full_output. Authored-by: Huaxin Gao <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 339eec5 commit 14003d4

File tree

3 files changed

+10
-14
lines changed

3 files changed

+10
-14
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -99,17 +99,15 @@ trait DataSourceScanExec extends LeafExecNode {
9999

100100
/** Physical plan node for scanning data from a relation. */
101101
case class RowDataSourceScanExec(
102-
fullOutput: Seq[Attribute],
103-
requiredColumnsIndex: Seq[Int],
102+
output: Seq[Attribute],
103+
requiredSchema: StructType,
104104
filters: Set[Filter],
105105
handledFilters: Set[Filter],
106106
rdd: RDD[InternalRow],
107107
@transient relation: BaseRelation,
108108
tableIdentifier: Option[TableIdentifier])
109109
extends DataSourceScanExec with InputRDDCodegen {
110110

111-
def output: Seq[Attribute] = requiredColumnsIndex.map(fullOutput)
112-
113111
override lazy val metrics =
114112
Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
115113

@@ -136,14 +134,14 @@ case class RowDataSourceScanExec(
136134
if (handledFilters.contains(filter)) s"*$filter" else s"$filter"
137135
}
138136
Map(
139-
"ReadSchema" -> output.toStructType.catalogString,
137+
"ReadSchema" -> requiredSchema.catalogString,
140138
"PushedFilters" -> markedFilters.mkString("[", ", ", "]"))
141139
}
142140

143141
// Don't care about `rdd` and `tableIdentifier` when canonicalizing.
144142
override def doCanonicalize(): SparkPlan =
145143
copy(
146-
fullOutput.map(QueryPlan.normalizeExpressions(_, fullOutput)),
144+
output.map(QueryPlan.normalizeExpressions(_, output)),
147145
rdd = null,
148146
tableIdentifier = None)
149147
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,7 @@ case class DataSourceStrategy(conf: SQLConf) extends Strategy with Logging with
307307
case l @ LogicalRelation(baseRelation: TableScan, _, _, _) =>
308308
RowDataSourceScanExec(
309309
l.output,
310-
l.output.indices,
310+
l.output.toStructType,
311311
Set.empty,
312312
Set.empty,
313313
toCatalystRDD(l, baseRelation.buildScan()),
@@ -379,8 +379,8 @@ case class DataSourceStrategy(conf: SQLConf) extends Strategy with Logging with
379379
.map(relation.attributeMap)
380380

381381
val scan = RowDataSourceScanExec(
382-
relation.output,
383-
requestedColumns.map(relation.output.indexOf),
382+
requestedColumns,
383+
requestedColumns.toStructType,
384384
pushedFilters.toSet,
385385
handledFilters,
386386
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
@@ -401,8 +401,8 @@ case class DataSourceStrategy(conf: SQLConf) extends Strategy with Logging with
401401
(projectSet ++ filterSet -- handledSet).map(relation.attributeMap).toSeq
402402

403403
val scan = RowDataSourceScanExec(
404-
relation.output,
405-
requestedColumns.map(relation.output.indexOf),
404+
requestedColumns,
405+
requestedColumns.toStructType,
406406
pushedFilters.toSet,
407407
handledFilters,
408408
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,11 +64,9 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
6464
}
6565
val rdd = v1Relation.buildScan()
6666
val unsafeRowRDD = DataSourceStrategy.toCatalystRDD(v1Relation, output, rdd)
67-
val originalOutputNames = relation.table.schema().map(_.name)
68-
val requiredColumnsIndex = output.map(_.name).map(originalOutputNames.indexOf)
6967
val dsScan = RowDataSourceScanExec(
7068
output,
71-
requiredColumnsIndex,
69+
output.toStructType,
7270
translated.toSet,
7371
pushed.toSet,
7472
unsafeRowRDD,

0 commit comments

Comments
 (0)