diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 54bee02e44e4..1a192e008b9c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -98,6 +98,13 @@ object SQLConf { .booleanConf .createWithDefault(true) + val CACHE_CODEGEN = buildConf("spark.sql.inMemoryColumnarStorage.codegen") + .internal() + .doc("When true, use generated code to build column batches for caching. This is only " + + "supported for basic types and improves caching performance for such types.") + .booleanConf + .createWithDefault(true) + val PREFER_SORTMERGEJOIN = buildConf("spark.sql.join.preferSortMergeJoin") .internal() .doc("When true, prefer sort merge join over shuffle hash join.") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala index e86116680a57..33c902f55e32 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala @@ -31,6 +31,8 @@ import org.apache.spark.sql.types.DataType */ private[sql] trait ColumnarBatchScan extends CodegenSupport { + val columnIndexes: Array[Int] = null + val inMemoryTableScan: InMemoryTableScanExec = null override lazy val metrics = Map( @@ -89,7 +91,8 @@ private[sql] trait ColumnarBatchScan extends CodegenSupport { val colVars = output.indices.map(i => ctx.freshName("colInstance" + i)) val columnAssigns = colVars.zipWithIndex.map { case (name, i) => ctx.addMutableState(columnVectorClz, name, s"$name = null;") - s"$name = $batch.column($i);" + val index = if (columnIndexes == null) i else columnIndexes(i) + s"$name = $batch.column($index);" } val nextBatch = ctx.freshName("nextBatch") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala index 14024d6c1055..de196c5f5948 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala @@ -21,6 +21,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.{CodeAndComment, CodeFormatter, CodeGenerator, UnsafeRowWriter} +import org.apache.spark.sql.execution.vectorized.ColumnarBatch import org.apache.spark.sql.types._ /** @@ -62,12 +63,16 @@ class MutableUnsafeRow(val writer: UnsafeRowWriter) extends BaseGenericInternalR /** * Generates bytecode for a [[ColumnarIterator]] for columnar cache. */ -object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarIterator] with Logging { +class GenerateColumnAccessor(useColumnarBatch: Boolean) + extends CodeGenerator[Seq[DataType], ColumnarIterator] with Logging { protected def canonicalize(in: Seq[DataType]): Seq[DataType] = in protected def bind(in: Seq[DataType], inputSchema: Seq[Attribute]): Seq[DataType] = in protected def create(columnTypes: Seq[DataType]): ColumnarIterator = { + if (useColumnarBatch) { + throw new UnsupportedOperationException("Not supported yet. Another PR will implement") + } val ctx = newCodeGenContext() val numFields = columnTypes.size val (initializeAccessors, extractors) = columnTypes.zipWithIndex.map { case (dt, index) => @@ -152,6 +157,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera (0 to groupedAccessorsLength - 1).map { i => s"extractors$i();" }.mkString("\n")) } + val cachedBatchBytesCls = classOf[CachedBatchBytes].getName val codeBody = s""" import java.nio.ByteBuffer; import java.nio.ByteOrder; @@ -205,9 +211,9 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera return false; } - ${classOf[CachedBatch].getName} batch = (${classOf[CachedBatch].getName}) input.next(); + $cachedBatchBytesCls batch = ($cachedBatchBytesCls) input.next(); currentRow = 0; - numRowsInBatch = batch.numRows(); + numRowsInBatch = batch.getNumRows(); for (int i = 0; i < columnIndexes.length; i ++) { buffers[i] = batch.buffers()[columnIndexes[i]]; } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GeneratedColumnarBatch.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GeneratedColumnarBatch.scala new file mode 100644 index 000000000000..d4371b049dd5 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GeneratedColumnarBatch.scala @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.columnar + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.expressions.codegen._ +import org.apache.spark.sql.execution.vectorized.ColumnarBatch +import org.apache.spark.sql.types._ +import org.apache.spark.storage.StorageLevel +import org.apache.spark.storage.StorageLevel._ + + +/** + * A helper class to expose the scala iterator to Java. + */ +abstract class ColumnarBatchIterator extends Iterator[ColumnarBatch] + + +/** + * Generate code to batch [[InternalRow]]s into [[ColumnarBatch]]es. + */ +class GenerateColumnarBatch( + schema: StructType, + batchSize: Int, + storageLevel: StorageLevel) + extends CodeGenerator[Iterator[InternalRow], Iterator[CachedColumnarBatch]] { + + protected def canonicalize(in: Iterator[InternalRow]): Iterator[InternalRow] = in + + protected def bind( + in: Iterator[InternalRow], inputSchema: Seq[Attribute]): Iterator[InternalRow] = { + in + } + + protected def create(rowIterator: Iterator[InternalRow]): Iterator[CachedColumnarBatch] = { + import scala.collection.JavaConverters._ + val ctx = newCodeGenContext() + val columnStatsCls = classOf[ColumnStats].getName + val rowVar = ctx.freshName("row") + val batchVar = ctx.freshName("columnarBatch") + val rowNumVar = ctx.freshName("rowNum") + val numBytesVar = ctx.freshName("bytesInBatch") + ctx.addMutableState("long", numBytesVar, s"$numBytesVar = 0;") + val rowIterVar = ctx.addReferenceObj( + "rowIterator", rowIterator.asJava, classOf[java.util.Iterator[_]].getName) + val schemas = StructType( + schema.fields.map(s => StructField(s.name, + s.dataType match { + case udt: UserDefinedType[_] => udt.sqlType + case other => other + }, s.nullable)) + ) + val schemaVar = ctx.addReferenceObj("schema", schemas, classOf[StructType].getName) + val maxNumBytes = ColumnBuilder.MAX_BATCH_SIZE_IN_BYTE + val numColumns = schema.fields.length + + val colStatVars = (0 to numColumns - 1).map(i => ctx.freshName("colStat" + i)) + val colStatCode = ctx.splitExpressions( + (schemas.fields zip colStatVars).zipWithIndex.map { + case ((field, varName), i) => + val columnStatsCls = field.dataType match { + case IntegerType => classOf[IntColumnStats].getName + case DoubleType => classOf[DoubleColumnStats].getName + case others => throw new UnsupportedOperationException(s"$others is not supported yet") + } + ctx.addMutableState(columnStatsCls, varName, "") + s"$varName = new $columnStatsCls(); statsArray[$i] = $varName;\n" + }, + "apply", + Seq.empty + ) + + val populateColumnVectorsCode = ctx.splitExpressions( + (schemas.fields zip colStatVars).zipWithIndex.map { + case ((field, colStatVar), i) => + GenerateColumnarBatch.putColumnCode(ctx, field.dataType, field.nullable, + batchVar, rowVar, rowNumVar, colStatVar, i, numBytesVar).trim + "\n" + }, + "apply", + Seq(("InternalRow", rowVar), ("ColumnarBatch", batchVar), ("int", rowNumVar)) + ) + + val code = s""" + import org.apache.spark.memory.MemoryMode; + import org.apache.spark.sql.catalyst.InternalRow; + import org.apache.spark.sql.execution.columnar.CachedColumnarBatch; + import org.apache.spark.sql.execution.columnar.GenerateColumnarBatch; + import org.apache.spark.sql.execution.vectorized.ColumnarBatch; + import org.apache.spark.sql.execution.vectorized.ColumnVector; + + public GeneratedColumnarBatchIterator generate(Object[] references) { + return new GeneratedColumnarBatchIterator(references); + } + + class GeneratedColumnarBatchIterator extends ${classOf[ColumnarBatchIterator].getName} { + private Object[] references; + ${ctx.declareMutableStates()} + + public GeneratedColumnarBatchIterator(Object[] references) { + this.references = references; + ${ctx.initMutableStates()} + } + + ${ctx.declareAddedFunctions()} + + $columnStatsCls[] statsArray = new $columnStatsCls[$numColumns]; + private void allocateColumnStats() { + ${colStatCode.trim} + } + + @Override + public boolean hasNext() { + return $rowIterVar.hasNext(); + } + + @Override + public CachedColumnarBatch next() { + ColumnarBatch $batchVar = + ColumnarBatch.allocate($schemaVar, MemoryMode.ON_HEAP, $batchSize); + allocateColumnStats(); + int $rowNumVar = 0; + $numBytesVar = 0; + while ($rowIterVar.hasNext() && $rowNumVar < $batchSize && $numBytesVar < $maxNumBytes) { + InternalRow $rowVar = (InternalRow) $rowIterVar.next(); + $populateColumnVectorsCode + $rowNumVar += 1; + } + $batchVar.setNumRows($rowNumVar); + return CachedColumnarBatch.apply( + $batchVar, GenerateColumnarBatch.generateStats(statsArray)); + } + } + """ + val formattedCode = CodeFormatter.stripOverlappingComments( + new CodeAndComment(code, ctx.getPlaceHolderToComments())) + CodeGenerator.compile(formattedCode).generate(ctx.references.toArray) + .asInstanceOf[Iterator[CachedColumnarBatch]] + } +} + + +private[sql] object GenerateColumnarBatch { + def compressStorageLevel(storageLevel: StorageLevel, useCompression: Boolean): StorageLevel = { + if (!useCompression) return storageLevel + storageLevel match { + case MEMORY_ONLY => MEMORY_ONLY_SER + case MEMORY_ONLY_2 => MEMORY_ONLY_SER_2 + case MEMORY_AND_DISK => MEMORY_AND_DISK_SER + case MEMORY_AND_DISK_2 => MEMORY_AND_DISK_SER_2 + case sl => sl + } + } + + def isCompress(storageLevel: StorageLevel) : Boolean = { + (storageLevel == MEMORY_ONLY_SER || storageLevel == MEMORY_ONLY_SER_2 || + storageLevel == MEMORY_AND_DISK_SER || storageLevel == MEMORY_AND_DISK_SER_2) + } + + private val typeToName = Map[AbstractDataType, String]( + IntegerType -> "int", + DoubleType -> "double" + ) + + def putColumnCode(ctx: CodegenContext, dt: DataType, nullable: Boolean, batchVar: String, + rowVar: String, rowNumVar: String, + colStatVar: String, colNum: Int, numBytesVar: String): String = { + val colVar = s"$batchVar.column($colNum)" + val body = dt match { + case t if ctx.isPrimitiveType(dt) => + val typeName = GenerateColumnarBatch.typeToName(dt) + val put = "put" + typeName.capitalize + val get = "get" + typeName.capitalize + s""" + $typeName val = $rowVar.$get($colNum); + $colVar.$put($rowNumVar, val); + $numBytesVar += ${dt.defaultSize}; + $colStatVar.gatherValueStats(val); + """ + case _ => + throw new UnsupportedOperationException("Unsupported data type " + dt.simpleString); + } + if (nullable) { + s""" + if ($rowVar.isNullAt($colNum)) { + $colVar.putNull($rowNumVar); + $colStatVar.gatherNullStats(); + } else { + ${body.trim} + } + """ + } else { + s""" + { + ${body.trim} + } + """ + } + } + + def generateStats(columnStats: Array[ColumnStats]): InternalRow = { + val array = columnStats.flatMap(_.collectedStatistics) + InternalRow.fromSeq(array) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala index 456a8f3b20f3..67fce5aa38f5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala @@ -27,7 +27,9 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.Statistics import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.vectorized.ColumnarBatch import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ import org.apache.spark.storage.StorageLevel import org.apache.spark.util.LongAccumulator @@ -44,14 +46,36 @@ object InMemoryRelation { /** - * CachedBatch is a cached batch of rows. + * An abstract representation of a cached batch of rows. + */ +private[columnar] trait CachedBatch { + val stats: InternalRow + def getNumRows(): Int +} + + +/** + * A cached batch of rows stored as a list of byte arrays, one for each column. * * @param numRows The total number of rows in this batch * @param buffers The buffers for serialized columns * @param stats The stat of columns */ -private[columnar] -case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow) +private[columnar] case class CachedBatchBytes( + numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow) + extends CachedBatch { + def getNumRows(): Int = numRows +} + + +/** + * A cached batch of rows stored as a [[ColumnarBatch]]. + */ +private[columnar] case class CachedColumnarBatch(columnarBatch: ColumnarBatch, stats: InternalRow) + extends CachedBatch { + def getNumRows(): Int = columnarBatch.numRows() +} + case class InMemoryRelation( output: Seq[Attribute], @@ -64,6 +88,32 @@ case class InMemoryRelation( val batchStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator) extends logical.LeafNode with MultiInstanceRelation { + /** + * If true, store the input rows using [[CachedColumnarBatch]]es, which are generally faster. + * If false, store the input rows using [[CachedBatchBytes]]. + */ + private def numOfNestedFields(dataType: DataType): Int = dataType match { + case dt: StructType => dt.fields.map(f => numOfNestedFields(f.dataType)).sum + case m: MapType => numOfNestedFields(m.keyType) + numOfNestedFields(m.valueType) + case a: ArrayType => numOfNestedFields(a.elementType) + case u: UserDefinedType[_] => numOfNestedFields(u.sqlType) + case _ => 1 + } + + private[columnar] val useColumnarBatches: Boolean = { + // In the initial implementation, for ease of review + // support only integer and double and # of fields is less than wholeStageMaxNumFields + val schema = StructType.fromAttributes(child.output) + schema.fields.find(f => f.dataType match { + case IntegerType => false + case DoubleType => false + case _ => true + }).isEmpty && + !(children.map(p => numOfNestedFields(p.schema)) + .exists(_ > child.sqlContext.conf.wholeStageMaxNumFields)) && + child.sqlContext.conf.getConf(SQLConf.CACHE_CODEGEN) + } + override def innerChildren: Seq[SparkPlan] = Seq(child) override def producedAttributes: AttributeSet = outputSet @@ -80,17 +130,33 @@ case class InMemoryRelation( } } - // If the cached column buffers were not passed in, we calculate them in the constructor. - // As in Spark, the actual work of caching is lazy. - if (_cachedColumnBuffers == null) { - buildBuffers() + /** + * Batch the input rows into [[CachedBatch]]es. + */ + private def buildColumnBuffers: RDD[CachedBatch] = { + val buffers = + if (useColumnarBatches) { + buildColumnarBatches() + } else { + buildColumnBytes() + } + buffers.setName( + tableName.map { n => s"In-memory table $n" } + .getOrElse(StringUtils.abbreviate(child.toString, 1024))) + buffers.asInstanceOf[RDD[CachedBatch]] } - private def buildBuffers(): Unit = { + /** + * Batch the input rows into [[CachedBatchBytes]] built using [[ColumnBuilder]]s. + * + * This handles complex types and compression, but is more expensive than + * [[buildColumnarBatches]], which generates code to build the buffers. + */ + private def buildColumnBytes(): RDD[CachedBatchBytes] = { val output = child.output - val cached = child.execute().mapPartitionsInternal { rowIterator => - new Iterator[CachedBatch] { - def next(): CachedBatch = { + child.execute().mapPartitionsInternal { rowIterator => + new Iterator[CachedBatchBytes] { + def next(): CachedBatchBytes = { val columnBuilders = output.map { attribute => ColumnBuilder(attribute.dataType, batchSize, attribute.name, useCompression) }.toArray @@ -125,7 +191,7 @@ case class InMemoryRelation( val stats = InternalRow.fromSeq( columnBuilders.flatMap(_.columnStats.collectedStatistics)) - CachedBatch(rowCount, columnBuilders.map { builder => + CachedBatchBytes(rowCount, columnBuilders.map { builder => JavaUtils.bufferToArray(builder.build()) }, stats) } @@ -133,11 +199,46 @@ case class InMemoryRelation( def hasNext: Boolean = rowIterator.hasNext } }.persist(storageLevel) + } - cached.setName( - tableName.map(n => s"In-memory table $n") - .getOrElse(StringUtils.abbreviate(child.toString, 1024))) - _cachedColumnBuffers = cached + /** + * Batch the input rows using [[ColumnarBatch]]es. + * + * Compared with [[buildColumnBytes]], this provides a faster implementation of memory + * scan because both the read path and the write path are generated. + * However, this does not compress data for now + */ + private def buildColumnarBatches(): RDD[CachedColumnarBatch] = { + val schema = StructType.fromAttributes(child.output) + val newStorageLevel = GenerateColumnarBatch.compressStorageLevel(storageLevel, useCompression) + child.execute().mapPartitionsInternal { rows => + new GenerateColumnarBatch(schema, batchSize, newStorageLevel).generate(rows).map { + cachedColumnarBatch => { + var i = 0 + var totalSize = 0L + while (i < cachedColumnarBatch.columnarBatch.numCols()) { + totalSize += cachedColumnarBatch.stats.getLong(4 + i * 5) + i += 1 + } + batchStats.add(totalSize) + cachedColumnarBatch + } + } + }.persist(storageLevel) + } + + // If the cached column buffers were not passed in, we calculate them in the constructor. + // As in Spark, the actual work of caching is lazy. + if (_cachedColumnBuffers == null) { + _cachedColumnBuffers = buildColumnBuffers + } + + def recache(): Unit = { + if (_cachedColumnBuffers != null) { + _cachedColumnBuffers.unpersist() + _cachedColumnBuffers = null + } + _cachedColumnBuffers = buildColumnBuffers } def withOutput(newOutput: Seq[Attribute]): InMemoryRelation = { @@ -158,7 +259,15 @@ case class InMemoryRelation( batchStats).asInstanceOf[this.type] } - def cachedColumnBuffers: RDD[CachedBatch] = _cachedColumnBuffers + /** + * Return lazily cached batches of rows in the original plan. + */ + def cachedColumnBuffers: RDD[CachedBatch] = { + if (_cachedColumnBuffers == null) { + _cachedColumnBuffers = buildColumnBuffers + } + _cachedColumnBuffers + } override protected def otherCopyArgs: Seq[AnyRef] = Seq(_cachedColumnBuffers, batchStats) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index 1d601374de13..9946f2c0481b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning} -import org.apache.spark.sql.execution.LeafExecNode +import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode} import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.types.UserDefinedType @@ -32,12 +32,53 @@ case class InMemoryTableScanExec( attributes: Seq[Attribute], predicates: Seq[Expression], @transient relation: InMemoryRelation) - extends LeafExecNode { + extends LeafExecNode with ColumnarBatchScan { - override def innerChildren: Seq[QueryPlan[_]] = Seq(relation) ++ super.innerChildren + override val columnIndexes = + attributes.map(a => relation.output.map(o => o.exprId).indexOf(a.exprId)).toArray + + override val inMemoryTableScan = this + + override val supportCodegen: Boolean = relation.useColumnarBatches - override lazy val metrics = Map( - "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) + override def inputRDDs(): Seq[RDD[InternalRow]] = { + if (relation.useColumnarBatches) { + val schema = relation.partitionStatistics.schema + val schemaIndex = schema.zipWithIndex + val buffers = relation.cachedColumnBuffers.asInstanceOf[RDD[CachedColumnarBatch]] + val prunedBuffers = if (inMemoryPartitionPruningEnabled) { + buffers.mapPartitionsInternal { cachedColumnarBatchIterator => + val partitionFilter = newPredicate( + partitionFilters.reduceOption(And).getOrElse(Literal(true)), schema) + + // Do partition batch pruning if enabled + cachedColumnarBatchIterator.filter { cachedColumnarBatch => + if (!partitionFilter.eval(cachedColumnarBatch.stats)) { + def statsString: String = schemaIndex.map { + case (a, i) => + val value = cachedColumnarBatch.stats.get(i, a.dataType) + s"${a.name}: $value" + }.mkString(", ") + logInfo(s"Skipping partition based on stats $statsString") + false + } else { + true + } + } + } + } else { + buffers + } + + // HACK ALERT: This is actually an RDD[CachedColumnarBatch]. + // We're taking advantage of Scala's type erasure here to pass these batches along. + Seq(prunedBuffers.map(_.columnarBatch).asInstanceOf[RDD[InternalRow]]) + } else { + Seq() + } + } + + override def innerChildren: Seq[QueryPlan[_]] = Seq(relation) ++ super.innerChildren override def output: Seq[Attribute] = attributes @@ -148,6 +189,7 @@ case class InMemoryTableScanExec( val schemaIndex = schema.zipWithIndex val relOutput: AttributeSeq = relation.output val buffers = relation.cachedColumnBuffers + val useColumnarBatches = relation.useColumnarBatches buffers.mapPartitionsWithIndexInternal { (index, cachedBatchIterator) => val partitionFilter = newPredicate( @@ -186,7 +228,7 @@ case class InMemoryTableScanExec( if (enableAccumulators) { readBatches.add(1) } - numOutputRows += batch.numRows + numOutputRows += batch.getNumRows() batch } @@ -194,7 +236,8 @@ case class InMemoryTableScanExec( case udt: UserDefinedType[_] => udt.sqlType case other => other }.toArray - val columnarIterator = GenerateColumnAccessor.generate(columnTypes) + val columnarIterator = new GenerateColumnAccessor(useColumnarBatches) + .generate(columnTypes) columnarIterator.initialize(withMetrics, columnTypes, requestedColumnIndices.toArray) if (enableAccumulators && columnarIterator.hasNext) { readPartitions.add(1) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala index 109b1d9db60d..6ac73786cc8c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala @@ -21,7 +21,7 @@ import java.nio.charset.StandardCharsets import java.sql.{Date, Timestamp} import org.apache.spark.sql.{DataFrame, QueryTest, Row} -import org.apache.spark.sql.catalyst.expressions.AttributeSet +import org.apache.spark.sql.catalyst.expressions.{AttributeSet, GenericInternalRow} import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf @@ -311,12 +311,12 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { test("SPARK-14138: Generated SpecificColumnarIterator can exceed JVM size limit for cached DF") { val length1 = 3999 val columnTypes1 = List.fill(length1)(IntegerType) - val columnarIterator1 = GenerateColumnAccessor.generate(columnTypes1) + val columnarIterator1 = new GenerateColumnAccessor(false).generate(columnTypes1) // SPARK-16664: the limit of janino is 8117 val length2 = 8117 val columnTypes2 = List.fill(length2)(IntegerType) - val columnarIterator2 = GenerateColumnAccessor.generate(columnTypes2) + val columnarIterator2 = new GenerateColumnAccessor(false).generate(columnTypes2) } test("SPARK-17549: cached table size should be correctly calculated") {