Skip to content

Commit bd58665

Browse files
committed
add requiredSchema
1 parent 9558823 commit bd58665

File tree

3 files changed

+7
-1
lines changed

3 files changed

+7
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ trait DataSourceScanExec extends LeafExecNode {
100100
/** Physical plan node for scanning data from a relation. */
101101
case class RowDataSourceScanExec(
102102
output: Seq[Attribute],
103+
requiredSchema: StructType,
103104
filters: Set[Filter],
104105
handledFilters: Set[Filter],
105106
rdd: RDD[InternalRow],
@@ -133,13 +134,14 @@ case class RowDataSourceScanExec(
133134
if (handledFilters.contains(filter)) s"*$filter" else s"$filter"
134135
}
135136
Map(
136-
"ReadSchema" -> output.toStructType.catalogString,
137+
"ReadSchema" -> requiredSchema.catalogString,
137138
"PushedFilters" -> markedFilters.mkString("[", ", ", "]"))
138139
}
139140

140141
// Don't care about `rdd` and `tableIdentifier` when canonicalizing.
141142
override def doCanonicalize(): SparkPlan =
142143
copy(
144+
output.map(QueryPlan.normalizeExpressions(_, output)),
143145
rdd = null,
144146
tableIdentifier = None)
145147
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,7 @@ case class DataSourceStrategy(conf: SQLConf) extends Strategy with Logging with
307307
case l @ LogicalRelation(baseRelation: TableScan, _, _, _) =>
308308
RowDataSourceScanExec(
309309
l.output,
310+
l.output.toStructType,
310311
Set.empty,
311312
Set.empty,
312313
toCatalystRDD(l, baseRelation.buildScan()),
@@ -379,6 +380,7 @@ case class DataSourceStrategy(conf: SQLConf) extends Strategy with Logging with
379380

380381
val scan = RowDataSourceScanExec(
381382
requestedColumns,
383+
requestedColumns.toStructType,
382384
pushedFilters.toSet,
383385
handledFilters,
384386
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
@@ -400,6 +402,7 @@ case class DataSourceStrategy(conf: SQLConf) extends Strategy with Logging with
400402

401403
val scan = RowDataSourceScanExec(
402404
requestedColumns,
405+
requestedColumns.toStructType,
403406
pushedFilters.toSet,
404407
handledFilters,
405408
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
6666
val unsafeRowRDD = DataSourceStrategy.toCatalystRDD(v1Relation, output, rdd)
6767
val dsScan = RowDataSourceScanExec(
6868
output,
69+
output.toStructType,
6970
translated.toSet,
7071
pushed.toSet,
7172
unsafeRowRDD,

0 commit comments

Comments
 (0)