diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index a934c095eee11..25fd46af116da 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -140,13 +140,18 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper { |Output: ${output.mkString(", ")} """.stripMargin) - val plan = BatchScanExec(output, scan) + val batchExec = BatchScanExec(output, scan) val filterCondition = postScanFilters.reduceLeftOption(And) - val withFilter = filterCondition.map(FilterExec(_, plan)).getOrElse(plan) + val withFilter = filterCondition.map(FilterExec(_, batchExec)).getOrElse(batchExec) - // always add the projection, which will produce unsafe rows required by some operators - ProjectExec(project, withFilter) :: Nil + val withProjection = if (withFilter.output != project || !batchExec.supportsColumnar) { + ProjectExec(project, withFilter) + } else { + withFilter + } + + withProjection :: Nil case r: StreamingDataSourceV2Relation if r.startOffset.isDefined && r.endOffset.isDefined => val microBatchStream = r.stream.asInstanceOf[MicroBatchStream]