diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 7cad305aefeb8..f629f36642bb2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -155,17 +155,30 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper { case r: StreamingDataSourceV2Relation if r.startOffset.isDefined && r.endOffset.isDefined => val microBatchStream = r.stream.asInstanceOf[MicroBatchStream] - // ensure there is a projection, which will produce unsafe rows required by some operators - ProjectExec(r.output, - MicroBatchScanExec( - r.output, r.scan, microBatchStream, r.startOffset.get, r.endOffset.get)) :: Nil + val scanExec = MicroBatchScanExec( + r.output, r.scan, microBatchStream, r.startOffset.get, r.endOffset.get) + + val withProjection = if (scanExec.supportsColumnar) { + scanExec + } else { + // Add a Project here to make sure we produce unsafe rows. + ProjectExec(r.output, scanExec) + } + + withProjection :: Nil case r: StreamingDataSourceV2Relation if r.startOffset.isDefined && r.endOffset.isEmpty => val continuousStream = r.stream.asInstanceOf[ContinuousStream] - // ensure there is a projection, which will produce unsafe rows required by some operators - ProjectExec(r.output, - ContinuousScanExec( - r.output, r.scan, continuousStream, r.startOffset.get)) :: Nil + val scanExec = ContinuousScanExec(r.output, r.scan, continuousStream, r.startOffset.get) + + val withProjection = if (scanExec.supportsColumnar) { + scanExec + } else { + // Add a Project here to make sure we produce unsafe rows. + ProjectExec(r.output, scanExec) + } + + withProjection :: Nil case WriteToDataSourceV2(writer, query) => WriteToDataSourceV2Exec(writer, planLater(query)) :: Nil