diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala index 4385843d90112..9d1636ccf2718 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala @@ -57,8 +57,8 @@ class ColumnarRule { * [[org.apache.spark.sql.execution.python.ArrowEvalPythonExec]] and * [[MapPartitionsInRWithArrowExec]]. Eventually this should replace those implementations. */ -case class ColumnarToRowExec(child: SparkPlan) - extends UnaryExecNode with CodegenSupport { +case class ColumnarToRowExec(child: SparkPlan) extends UnaryExecNode with CodegenSupport { + assert(child.supportsColumnar) override def output: Seq[Attribute] = child.output @@ -66,31 +66,29 @@ case class ColumnarToRowExec(child: SparkPlan) override def outputOrdering: Seq[SortOrder] = child.outputOrdering + // `ColumnarToRowExec` processes the input RDD directly, which is kind of a leaf node in the + // codegen stage and needs to do the limit check. + protected override def canCheckLimitNotReached: Boolean = true + override lazy val metrics: Map[String, SQLMetric] = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), - "numInputBatches" -> SQLMetrics.createMetric(sparkContext, "number of input batches"), - "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time") + "numInputBatches" -> SQLMetrics.createMetric(sparkContext, "number of input batches") ) override def doExecute(): RDD[InternalRow] = { val numOutputRows = longMetric("numOutputRows") val numInputBatches = longMetric("numInputBatches") - val scanTime = longMetric("scanTime") - // UnsafeProjection is not serializable so do it on the executor side, which is why it is lazy - @transient lazy val outputProject = UnsafeProjection.create(output, output) - val batches = child.executeColumnar() - batches.flatMap(batch => { - val batchStartNs = System.nanoTime() - numInputBatches += 1 - // In order to match the numOutputRows metric in the generated code we update - // numOutputRows for each batch. This is less accurate than doing it at output - // because it will over count the number of rows output in the case of a limit, - // but it is more efficient. - numOutputRows += batch.numRows() - val ret = batch.rowIterator().asScala - scanTime += ((System.nanoTime() - batchStartNs) / (1000 * 1000)) - ret.map(outputProject) - }) + // This avoids calling `output` in the RDD closure, so that we don't need to include the entire + // plan (this) in the closure. + val localOutput = this.output + child.executeColumnar().mapPartitionsInternal { batches => + val toUnsafe = UnsafeProjection.create(localOutput, localOutput) + batches.flatMap { batch => + numInputBatches += 1 + numOutputRows += batch.numRows() + batch.rowIterator().asScala.map(toUnsafe) + } + } } /** @@ -136,9 +134,6 @@ case class ColumnarToRowExec(child: SparkPlan) // metrics val numOutputRows = metricTerm(ctx, "numOutputRows") val numInputBatches = metricTerm(ctx, "numInputBatches") - val scanTimeMetric = metricTerm(ctx, "scanTime") - val scanTimeTotalNs = - ctx.addMutableState(CodeGenerator.JAVA_LONG, "scanTime") // init as scanTime = 0 val columnarBatchClz = classOf[ColumnarBatch].getName val batch = ctx.addMutableState(columnarBatchClz, "batch") @@ -156,15 +151,13 @@ case class ColumnarToRowExec(child: SparkPlan) val nextBatchFuncName = ctx.addNewFunction(nextBatch, s""" |private void $nextBatch() throws java.io.IOException { - | long getBatchStart = System.nanoTime(); | if ($input.hasNext()) { | $batch = ($columnarBatchClz)$input.next(); + | $numInputBatches.add(1); | $numOutputRows.add($batch.numRows()); | $idx = 0; | ${columnAssigns.mkString("", "\n", "\n")} - | ${numInputBatches}.add(1); | } - | $scanTimeTotalNs += System.nanoTime() - getBatchStart; |}""".stripMargin) ctx.currentVars = null @@ -184,7 +177,7 @@ case class ColumnarToRowExec(child: SparkPlan) |if ($batch == null) { | $nextBatchFuncName(); |} - |while ($batch != null) { + |while ($limitNotReachedCond $batch != null) { | int $numRows = $batch.numRows(); | int $localEnd = $numRows - $idx; | for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) { @@ -196,13 +189,11 @@ case class ColumnarToRowExec(child: SparkPlan) | $batch = null; | $nextBatchFuncName(); |} - |$scanTimeMetric.add($scanTimeTotalNs / (1000 * 1000)); - |$scanTimeTotalNs = 0; """.stripMargin } override def inputRDDs(): Seq[RDD[InternalRow]] = { - child.asInstanceOf[CodegenSupport].inputRDDs() + Seq(child.executeColumnar().asInstanceOf[RDD[InternalRow]]) // Hack because of type erasure } } @@ -439,47 +430,46 @@ case class RowToColumnarExec(child: SparkPlan) extends UnaryExecNode { // Instead of creating a new config we are reusing columnBatchSize. In the future if we do // combine with some of the Arrow conversion tools we will need to unify some of the configs. val numRows = conf.columnBatchSize - val converters = new RowToColumnConverter(schema) - val rowBased = child.execute() - rowBased.mapPartitions(rowIterator => { - new Iterator[ColumnarBatch] { - var cb: ColumnarBatch = null - - TaskContext.get().addTaskCompletionListener[Unit] { _ => - if (cb != null) { - cb.close() - cb = null + // This avoids calling `schema` in the RDD closure, so that we don't need to include the entire + // plan (this) in the closure. + val localSchema = this.schema + child.execute().mapPartitionsInternal { rowIterator => + if (rowIterator.hasNext) { + new Iterator[ColumnarBatch] { + private val converters = new RowToColumnConverter(localSchema) + private val vectors: Seq[WritableColumnVector] = if (enableOffHeapColumnVector) { + OffHeapColumnVector.allocateColumns(numRows, localSchema) + } else { + OnHeapColumnVector.allocateColumns(numRows, localSchema) } - } - - override def hasNext: Boolean = { - rowIterator.hasNext - } + private val cb: ColumnarBatch = new ColumnarBatch(vectors.toArray) - override def next(): ColumnarBatch = { - if (cb != null) { + TaskContext.get().addTaskCompletionListener[Unit] { _ => cb.close() - cb = null } - val columnVectors : Array[WritableColumnVector] = - if (enableOffHeapColumnVector) { - OffHeapColumnVector.allocateColumns(numRows, schema).toArray - } else { - OnHeapColumnVector.allocateColumns(numRows, schema).toArray + + override def hasNext: Boolean = { + rowIterator.hasNext + } + + override def next(): ColumnarBatch = { + cb.setNumRows(0) + var rowCount = 0 + while (rowCount < numRows && rowIterator.hasNext) { + val row = rowIterator.next() + converters.convert(row, vectors.toArray) + rowCount += 1 } - var rowCount = 0 - while (rowCount < numRows && rowIterator.hasNext) { - val row = rowIterator.next() - converters.convert(row, columnVectors) - rowCount += 1 + cb.setNumRows(rowCount) + numInputRows += rowCount + numOutputBatches += 1 + cb } - cb = new ColumnarBatch(columnVectors.toArray, rowCount) - numInputRows += rowCount - numOutputBatches += 1 - cb } + } else { + Iterator.empty } - }) + } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 728ac3a466fbf..984f4d3474b03 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partition import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource} -import org.apache.spark.sql.execution.metric.SQLMetrics +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.sources.{BaseRelation, Filter} import org.apache.spark.sql.types.StructType import org.apache.spark.sql.vectorized.ColumnarBatch @@ -334,37 +334,61 @@ case class FileSourceScanExec( inputRDD :: Nil } - override lazy val metrics = - Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), - "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of files read"), - "metadataTime" -> SQLMetrics.createTimingMetric(sparkContext, "metadata time"), - "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time")) + override lazy val metrics = Map( + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), + "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of files read"), + "metadataTime" -> SQLMetrics.createTimingMetric(sparkContext, "metadata time") + ) ++ { + // Tracking scan time has overhead, we can't afford to do it for each row, and can only do + // it for each batch. + if (supportsColumnar) { + Some("scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time")) + } else { + None + } + } protected override def doExecute(): RDD[InternalRow] = { val numOutputRows = longMetric("numOutputRows") - if (needsUnsafeRowConversion) { inputRDD.mapPartitionsWithIndexInternal { (index, iter) => - val proj = UnsafeProjection.create(schema) - proj.initialize(index) - iter.map( r => { + val toUnsafe = UnsafeProjection.create(schema) + toUnsafe.initialize(index) + iter.map { row => numOutputRows += 1 - proj(r) - }) + toUnsafe(row) + } } } else { - inputRDD.map { r => - numOutputRows += 1 - r + inputRDD.mapPartitionsInternal { iter => + iter.map { row => + numOutputRows += 1 + row + } } } } protected override def doExecuteColumnar(): RDD[ColumnarBatch] = { val numOutputRows = longMetric("numOutputRows") - inputRDD.asInstanceOf[RDD[ColumnarBatch]].map { batch => - numOutputRows += batch.numRows() - batch + val scanTime = longMetric("scanTime") + inputRDD.asInstanceOf[RDD[ColumnarBatch]].mapPartitionsInternal { batches => + new Iterator[ColumnarBatch] { + + override def hasNext: Boolean = { + // The `FileScanRDD` returns an iterator which scans the file during the `hasNext` call. + val startNs = System.nanoTime() + val res = batches.hasNext + scanTime += NANOSECONDS.toMillis(System.nanoTime() - startNs) + res + } + + override def next(): ColumnarBatch = { + val batch = batches.next() + numOutputRows += batch.numRows() + batch + } + } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index d9d9b1f9016ea..5fda272ce21a3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.execution -import java.io.Writer import java.util.Locale import java.util.concurrent.atomic.AtomicInteger @@ -491,12 +490,8 @@ trait InputRDDCodegen extends CodegenSupport { * * This is the leaf node of a tree with WholeStageCodegen that is used to generate code * that consumes an RDD iterator of InternalRow. - * - * @param isChildColumnar true if the inputRDD is really columnar data hidden by type erasure, - * false if inputRDD is really an RDD[InternalRow] */ -case class InputAdapter(child: SparkPlan, isChildColumnar: Boolean) - extends UnaryExecNode with InputRDDCodegen { +case class InputAdapter(child: SparkPlan) extends UnaryExecNode with InputRDDCodegen { override def output: Seq[Attribute] = child.output @@ -522,13 +517,10 @@ case class InputAdapter(child: SparkPlan, isChildColumnar: Boolean) child.executeColumnar() } - override def inputRDD: RDD[InternalRow] = { - if (isChildColumnar) { - child.executeColumnar().asInstanceOf[RDD[InternalRow]] // Hack because of type erasure - } else { - child.execute() - } - } + // `InputAdapter` can only generate code to process the rows from its child. If the child produces + // columnar batches, there must be a `ColumnarToRowExec` above `InputAdapter` to handle it by + // overriding `inputRDDs` and calling `InputAdapter#executeColumnar` directly. + override def inputRDD: RDD[InternalRow] = child.execute() // This is a leaf node so the node can produce limit not reached checks. override protected def canCheckLimitNotReached: Boolean = true @@ -870,59 +862,45 @@ case class CollapseCodegenStages( /** * Inserts an InputAdapter on top of those that do not support codegen. */ - private def insertInputAdapter(plan: SparkPlan, isColumnarInput: Boolean): SparkPlan = { - val isColumnar = adjustColumnar(plan, isColumnarInput) + private def insertInputAdapter(plan: SparkPlan): SparkPlan = { plan match { case p if !supportCodegen(p) => // collapse them recursively - InputAdapter(insertWholeStageCodegen(p, isColumnar), isColumnar) + InputAdapter(insertWholeStageCodegen(p)) case j: SortMergeJoinExec => // The children of SortMergeJoin should do codegen separately. j.withNewChildren(j.children.map( - child => InputAdapter(insertWholeStageCodegen(child, isColumnar), isColumnar))) - case p => - p.withNewChildren(p.children.map(insertInputAdapter(_, isColumnar))) + child => InputAdapter(insertWholeStageCodegen(child)))) + case p => p.withNewChildren(p.children.map(insertInputAdapter)) } } /** * Inserts a WholeStageCodegen on top of those that support codegen. */ - private def insertWholeStageCodegen(plan: SparkPlan, isColumnarInput: Boolean): SparkPlan = { - val isColumnar = adjustColumnar(plan, isColumnarInput) + private def insertWholeStageCodegen(plan: SparkPlan): SparkPlan = { plan match { // For operators that will output domain object, do not insert WholeStageCodegen for it as // domain object can not be written into unsafe row. case plan if plan.output.length == 1 && plan.output.head.dataType.isInstanceOf[ObjectType] => - plan.withNewChildren(plan.children.map(insertWholeStageCodegen(_, isColumnar))) + plan.withNewChildren(plan.children.map(insertWholeStageCodegen)) case plan: LocalTableScanExec => // Do not make LogicalTableScanExec the root of WholeStageCodegen // to support the fast driver-local collect/take paths. plan case plan: CodegenSupport if supportCodegen(plan) => - WholeStageCodegenExec( - insertInputAdapter(plan, isColumnar))(codegenStageCounter.incrementAndGet()) + // The whole-stage-codegen framework is row-based. If a plan supports columnar execution, + // it can't support whole-stage-codegen at the same time. + assert(!plan.supportsColumnar) + WholeStageCodegenExec(insertInputAdapter(plan))(codegenStageCounter.incrementAndGet()) case other => - other.withNewChildren(other.children.map(insertWholeStageCodegen(_, isColumnar))) + other.withNewChildren(other.children.map(insertWholeStageCodegen)) } } - /** - * Depending on the stage in the plan and if we currently are columnar or not - * return if we are still columnar or not. - */ - private def adjustColumnar(plan: SparkPlan, isColumnar: Boolean): Boolean = - // We are walking up the plan, so columnar starts when we transition to rows - // and ends when we transition to columns - plan match { - case c2r: ColumnarToRowExec => true - case r2c: RowToColumnarExec => false - case _ => isColumnar - } - def apply(plan: SparkPlan): SparkPlan = { if (conf.wholeStageEnabled) { - insertWholeStageCodegen(plan, false) + insertWholeStageCodegen(plan) } else { plan } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index 3566ab1aa5a33..e197cd85f7bf5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -115,22 +115,19 @@ case class InMemoryTableScanExec( val offHeapColumnVectorEnabled = conf.offHeapColumnVectorEnabled buffers .map(createAndDecompressColumn(_, offHeapColumnVectorEnabled)) - .map(b => { - numOutputRows += b.numRows() - b - }) + .map { buffer => + numOutputRows += buffer.numRows() + buffer + } } private lazy val inputRDD: RDD[InternalRow] = { - val buffers = filteredCachedBatches() - val offHeapColumnVectorEnabled = conf.offHeapColumnVectorEnabled - val numOutputRows = longMetric("numOutputRows") - if (enableAccumulatorsForTest) { readPartitions.setValue(0) readBatches.setValue(0) } + val numOutputRows = longMetric("numOutputRows") // Using these variables here to avoid serialization of entire objects (if referenced // directly) within the map Partitions closure. val relOutput: AttributeSeq = relation.output diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala index c5c902ffc4104..74fc5432ea82c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala @@ -79,10 +79,9 @@ trait DataSourceV2ScanExecBase extends LeafExecNode { override def doExecuteColumnar(): RDD[ColumnarBatch] = { val numOutputRows = longMetric("numOutputRows") - inputRDD.asInstanceOf[RDD[ColumnarBatch]].map { - b => - numOutputRows += b.numRows() - b + inputRDD.asInstanceOf[RDD[ColumnarBatch]].map { b => + numOutputRows += b.numRows() + b } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index 38a51474cbcd8..b46abdb48e738 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -1294,7 +1294,7 @@ class SubquerySuite extends QueryTest with SharedSQLContext { // need to execute the query before we can examine fs.inputRDDs() assert(df.queryExecution.executedPlan match { case WholeStageCodegenExec(ColumnarToRowExec(InputAdapter( - fs @ FileSourceScanExec(_, _, _, partitionFilters, _, _, _), _))) => + fs @ FileSourceScanExec(_, _, _, partitionFilters, _, _, _)))) => partitionFilters.exists(ExecSubqueryExpression.hasSubquery) && fs.inputRDDs().forall( _.asInstanceOf[FileScanRDD].filePartitions.forall( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/LogicalPlanTagInSparkPlanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/LogicalPlanTagInSparkPlanSuite.scala index b1143484a85e8..aa83b9b11dcfc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/LogicalPlanTagInSparkPlanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/LogicalPlanTagInSparkPlanSuite.scala @@ -44,14 +44,11 @@ class LogicalPlanTagInSparkPlanSuite extends TPCDSQuerySuite { } // A scan plan tree is a plan tree that has a leaf node under zero or more Project/Filter nodes. - // Because of how codegen and columnar to row transitions work, we may have InputAdaptors - // and ColumnarToRow transformations in the middle of it, but they will not have the tag - // we want, so skip them if they are the first thing we see - private def isScanPlanTree(plan: SparkPlan, first: Boolean): Boolean = plan match { - case i: InputAdapter if !first => isScanPlanTree(i.child, false) - case c: ColumnarToRowExec if !first => isScanPlanTree(c.child, false) - case p: ProjectExec => isScanPlanTree(p.child, false) - case f: FilterExec => isScanPlanTree(f.child, false) + // We may add `ColumnarToRowExec` and `InputAdapter` above the scan node after planning. + private def isScanPlanTree(plan: SparkPlan): Boolean = plan match { + case ColumnarToRowExec(i: InputAdapter) => isScanPlanTree(i.child) + case p: ProjectExec => isScanPlanTree(p.child) + case f: FilterExec => isScanPlanTree(f.child) case _: LeafExecNode => true case _ => false } @@ -92,7 +89,14 @@ class LogicalPlanTagInSparkPlanSuite extends TPCDSQuerySuite { case _: SubqueryExec | _: ReusedSubqueryExec => assert(plan.getTagValue(SparkPlan.LOGICAL_PLAN_TAG).isEmpty) - case _ if isScanPlanTree(plan, true) => + case _ if isScanPlanTree(plan) => + // `ColumnarToRowExec` and `InputAdapter` are added outside of the planner, which doesn't + // have the logical plan tag. + val actualPlan = plan match { + case ColumnarToRowExec(i: InputAdapter) => i.child + case _ => plan + } + // The strategies for planning scan can remove or add FilterExec/ProjectExec nodes, // so it's not simple to check. Instead, we only check that the origin LogicalPlan // contains the corresponding leaf node of the SparkPlan. @@ -100,7 +104,7 @@ class LogicalPlanTagInSparkPlanSuite extends TPCDSQuerySuite { // logical = Project(Filter(Scan A)) // physical = ProjectExec(ScanExec A) // we only check that leaf modes match between logical and physical plan. - val logicalLeaves = getLogicalPlan(plan).collectLeaves() + val logicalLeaves = getLogicalPlan(actualPlan).collectLeaves() val physicalLeaves = plan.collectLeaves() assert(logicalLeaves.length == 1) assert(physicalLeaves.length == 1) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala index 59b9e155049b5..55dff16887cb8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution import org.apache.spark.sql.{Dataset, QueryTest, Row, SaveMode} import org.apache.spark.sql.catalyst.expressions.codegen.{CodeAndComment, CodeGenerator} import org.apache.spark.sql.execution.aggregate.HashAggregateExec -import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec +import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec import org.apache.spark.sql.execution.joins.SortMergeJoinExec import org.apache.spark.sql.expressions.scalalang.typed @@ -120,6 +120,30 @@ class WholeStageCodegenSuite extends QueryTest with SharedSQLContext { assert(ds.collect() === Array(("a", 10.0), ("b", 3.0), ("c", 1.0))) } + test("cache for primitive type should be in WholeStageCodegen with InMemoryTableScanExec") { + import testImplicits._ + + val dsInt = spark.range(3).cache() + dsInt.count() + val dsIntFilter = dsInt.filter(_ > 0) + val planInt = dsIntFilter.queryExecution.executedPlan + assert(planInt.collect { + case WholeStageCodegenExec(FilterExec(_, + ColumnarToRowExec(InputAdapter(_: InMemoryTableScanExec)))) => () + }.length == 1) + assert(dsIntFilter.collect() === Array(1, 2)) + + // cache for string type is not supported for InMemoryTableScanExec + val dsString = spark.range(3).map(_.toString).cache() + dsString.count() + val dsStringFilter = dsString.filter(_ == "1") + val planString = dsStringFilter.queryExecution.executedPlan + assert(planString.collect { + case _: ColumnarToRowExec => () + }.isEmpty) + assert(dsStringFilter.collect() === Array("1")) + } + test("SPARK-19512 codegen for comparing structs is incorrect") { // this would raise CompileException before the fix spark.range(10) @@ -196,6 +220,25 @@ class WholeStageCodegenSuite extends QueryTest with SharedSQLContext { assert(maxCodeSize2 > SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.defaultValue.get) } + ignore("bytecode of batch file scan exceeds the limit of WHOLESTAGE_HUGE_METHOD_LIMIT") { + import testImplicits._ + withTempPath { dir => + val path = dir.getCanonicalPath + val df = spark.range(10).select(Seq.tabulate(201) {i => ('id + i).as(s"c$i")} : _*) + df.write.mode(SaveMode.Overwrite).parquet(path) + + withSQLConf(SQLConf.WHOLESTAGE_MAX_NUM_FIELDS.key -> "202", + SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key -> "2000") { + // wide table batch scan causes the byte code of codegen exceeds the limit of + // WHOLESTAGE_HUGE_METHOD_LIMIT + val df2 = spark.read.parquet(path) + val fileScan2 = df2.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get + assert(fileScan2.asInstanceOf[FileSourceScanExec].supportsColumnar) + checkAnswer(df2, df) + } + } + } + test("Control splitting consume function by operators with config") { import testImplicits._ val df = spark.range(10).select(Seq.tabulate(2) {i => ('id + i).as(s"c$i")} : _*) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala index 8a18a1ab5406f..289cc667a1c66 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala @@ -50,7 +50,7 @@ class BatchEvalPythonExecSuite extends SparkPlanTest with SharedSQLContext { val qualifiedPlanNodes = df.queryExecution.executedPlan.collect { case f @ FilterExec( And(_: AttributeReference, _: AttributeReference), - InputAdapter(_: BatchEvalPythonExec, _)) => f + InputAdapter(_: BatchEvalPythonExec)) => f case b @ BatchEvalPythonExec(_, _, WholeStageCodegenExec(FilterExec(_: In, _))) => b } assert(qualifiedPlanNodes.size == 2) @@ -60,7 +60,7 @@ class BatchEvalPythonExecSuite extends SparkPlanTest with SharedSQLContext { val df = Seq(("Hello", 4)).toDF("a", "b") .where("dummyPythonUDF(a, dummyPythonUDF(a, b)) and a in (3, 4)") val qualifiedPlanNodes = df.queryExecution.executedPlan.collect { - case f @ FilterExec(_: AttributeReference, InputAdapter(_: BatchEvalPythonExec, _)) => f + case f @ FilterExec(_: AttributeReference, InputAdapter(_: BatchEvalPythonExec)) => f case b @ BatchEvalPythonExec(_, _, WholeStageCodegenExec(FilterExec(_: In, _))) => b } assert(qualifiedPlanNodes.size == 2) @@ -72,7 +72,7 @@ class BatchEvalPythonExecSuite extends SparkPlanTest with SharedSQLContext { val qualifiedPlanNodes = df.queryExecution.executedPlan.collect { case f @ FilterExec( And(_: AttributeReference, _: GreaterThan), - InputAdapter(_: BatchEvalPythonExec, _)) => f + InputAdapter(_: BatchEvalPythonExec)) => f case b @ BatchEvalPythonExec(_, _, WholeStageCodegenExec(_: FilterExec)) => b } assert(qualifiedPlanNodes.size == 2) @@ -85,7 +85,7 @@ class BatchEvalPythonExecSuite extends SparkPlanTest with SharedSQLContext { val qualifiedPlanNodes = df.queryExecution.executedPlan.collect { case f @ FilterExec( And(_: AttributeReference, _: GreaterThan), - InputAdapter(_: BatchEvalPythonExec, _)) => f + InputAdapter(_: BatchEvalPythonExec)) => f case b @ BatchEvalPythonExec(_, _, WholeStageCodegenExec(_: FilterExec)) => b } assert(qualifiedPlanNodes.size == 2)