Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ package org.apache.spark.sql.execution

import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.columnar.InMemoryRelation
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.execution.vectorized.{ColumnarBatch, ColumnVector}
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.execution.vectorized.{ColumnarBatch, ColumnVector, WritableColumnVector}
import org.apache.spark.sql.types.{DataType, DataTypes}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we remove these unnecessary imports?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, done



/**
Expand All @@ -31,7 +31,7 @@ import org.apache.spark.sql.types.DataType
*/
private[sql] trait ColumnarBatchScan extends CodegenSupport {

val inMemoryTableScan: InMemoryTableScanExec = null
val columnIndexes: Array[Int] = null

def vectorTypes: Option[Seq[String]] = None

Expand Down Expand Up @@ -84,25 +84,45 @@ private[sql] trait ColumnarBatchScan extends CodegenSupport {
val columnarBatchClz = classOf[ColumnarBatch].getName
val batch = ctx.freshName("batch")
ctx.addMutableState(columnarBatchClz, batch, s"$batch = null;")
val cachedBatchClz = "org.apache.spark.sql.execution.columnar.CachedBatch"
val cachedBatch = ctx.freshName("cachedBatch")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this needed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, removed


val idx = ctx.freshName("batchIdx")
ctx.addMutableState("int", idx, s"$idx = 0;")
val colVars = output.indices.map(i => ctx.freshName("colInstance" + i))
val columnVectorClzs = vectorTypes.getOrElse(
Seq.fill(colVars.size)(classOf[ColumnVector].getName))
val columnAccessorClz = "org.apache.spark.sql.execution.columnar.ColumnAccessor"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these table cache specific code should go to InMemoryTableScanExec instead of here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, done

val writableColumnVectorClz = classOf[WritableColumnVector].getName
val dataTypesClz = classOf[DataTypes].getName
val columnAssigns = colVars.zip(columnVectorClzs).zipWithIndex.map {
case ((name, columnVectorClz), i) =>
ctx.addMutableState(columnVectorClz, name, s"$name = null;")
s"$name = ($columnVectorClz) $batch.column($i);"
val index = if (columnIndexes == null) i else columnIndexes(i)
s"$name = ($columnVectorClz) $batch.column($index);" + (if (columnIndexes == null) "" else {
val dt = output.attrs(i).dataType
s"\n$columnAccessorClz$$.MODULE$$.decompress(" +
s"$cachedBatch.buffers()[$index], ($writableColumnVectorClz) $name, " +
s"$dataTypesClz.$dt, $cachedBatch.numRows());"
})
}

val assignBatch = if (columnIndexes == null) {
s"$batch = ($columnarBatchClz)$input.next();"
} else {
val inMemoryRelationClz = classOf[InMemoryRelation].getName
s"""
$cachedBatchClz $cachedBatch = ($cachedBatchClz)$input.next();
$batch = $inMemoryRelationClz$$.MODULE$$.createColumn($cachedBatch);
"""
}
val nextBatch = ctx.freshName("nextBatch")
val nextBatchFuncName = ctx.addNewFunction(nextBatch,
s"""
|private void $nextBatch() throws java.io.IOException {
| long getBatchStart = System.nanoTime();
| if ($input.hasNext()) {
| $batch = ($columnarBatchClz)$input.next();
| $assignBatch
| $numOutputRows.add($batch.numRows());
| $idx = 0;
| ${columnAssigns.mkString("", "\n", "\n")}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,18 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with CodegenSupp

object WholeStageCodegenExec {
val PIPELINE_DURATION_METRIC = "duration"

private def numOfNestedFields(dataType: DataType): Int = dataType match {
case dt: StructType => dt.fields.map(f => numOfNestedFields(f.dataType)).sum
case m: MapType => numOfNestedFields(m.keyType) + numOfNestedFields(m.valueType)
case a: ArrayType => numOfNestedFields(a.elementType)
case u: UserDefinedType[_] => numOfNestedFields(u.sqlType)
case _ => 1
}

def isTooManyFields(conf: SQLConf, dataType: DataType): Boolean = {
numOfNestedFields(dataType) > conf.wholeStageMaxNumFields
}
}

/**
Expand Down Expand Up @@ -490,22 +502,14 @@ case class CollapseCodegenStages(conf: SQLConf) extends Rule[SparkPlan] {
case _ => true
}

private def numOfNestedFields(dataType: DataType): Int = dataType match {
case dt: StructType => dt.fields.map(f => numOfNestedFields(f.dataType)).sum
case m: MapType => numOfNestedFields(m.keyType) + numOfNestedFields(m.valueType)
case a: ArrayType => numOfNestedFields(a.elementType)
case u: UserDefinedType[_] => numOfNestedFields(u.sqlType)
case _ => 1
}

private def supportCodegen(plan: SparkPlan): Boolean = plan match {
case plan: CodegenSupport if plan.supportCodegen =>
val willFallback = plan.expressions.exists(_.find(e => !supportCodegen(e)).isDefined)
// the generated code will be huge if there are too many columns
val hasTooManyOutputFields =
numOfNestedFields(plan.schema) > conf.wholeStageMaxNumFields
WholeStageCodegenExec.isTooManyFields(conf, plan.schema)
val hasTooManyInputFields =
plan.children.map(p => numOfNestedFields(p.schema)).exists(_ > conf.wholeStageMaxNumFields)
plan.children.find(p => WholeStageCodegenExec.isTooManyFields(conf, p.schema)).isDefined
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

find(...).isDefined -> exists?

!willFallback && !hasTooManyOutputFields && !hasTooManyInputFields
case _ => false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,4 +163,12 @@ private[sql] object ColumnAccessor {
throw new RuntimeException("Not support non-primitive type now")
}
}

def decompress(
array: Array[Byte], columnVector: WritableColumnVector, dataType: DataType, numRows: Int):
Unit = {
val byteBuffer = ByteBuffer.wrap(array)
val columnAccessor = ColumnAccessor(dataType, byteBuffer)
decompress(columnAccessor, columnVector, numRows)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.Statistics
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.{SparkPlan, WholeStageCodegenExec}
import org.apache.spark.sql.execution.vectorized.{ColumnarBatch, ColumnVector, OnHeapColumnVector}
import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.LongAccumulator

Expand All @@ -39,6 +41,16 @@ object InMemoryRelation {
child: SparkPlan,
tableName: Option[String]): InMemoryRelation =
new InMemoryRelation(child.output, useCompression, batchSize, storageLevel, child, tableName)()

def createColumn(cachedColumnarBatch: CachedBatch): ColumnarBatch = {
val rowCount = cachedColumnarBatch.numRows
val schema = cachedColumnarBatch.schema
val columnVectors = OnHeapColumnVector.allocateColumns(rowCount, schema)
val columnarBatch = new ColumnarBatch(
schema, columnVectors.asInstanceOf[Array[ColumnVector]], rowCount)
columnarBatch.setNumRows(rowCount)
return columnarBatch
}
}


Expand All @@ -48,9 +60,11 @@ object InMemoryRelation {
* @param numRows The total number of rows in this batch
* @param buffers The buffers for serialized columns
* @param stats The stat of columns
* @param schema The schema of columns
*/
private[columnar]
case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow)
case class CachedBatch(
numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow, schema: StructType)

case class InMemoryRelation(
output: Seq[Attribute],
Expand All @@ -63,6 +77,23 @@ case class InMemoryRelation(
val batchStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator)
extends logical.LeafNode with MultiInstanceRelation {

/**
* If true, get data from ColumnVector in ColumnarBatch, which are generally faster.
* If false, get data from UnsafeRow build from ColumnVector
*/
private[columnar] val useColumnarBatches: Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should belong to the read path. Whether we use columnar batch scan or not, the write path doesn't need to change.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. Previous implementation required to change execution path in the write path, too. After review comments, this flag is not referenced in InMemoryRelation. I will move this flag to InMemoryTableScanExec.

// In the initial implementation, for ease of review
// support only primitive data types and # of fields is less than wholeStageMaxNumFields
val schema = StructType.fromAttributes(child.output)
schema.fields.find(f => f.dataType match {
case BooleanType | ByteType | ShortType | IntegerType | LongType |
FloatType | DoubleType => false
case _ => true
}).isEmpty &&
!WholeStageCodegenExec.isTooManyFields(conf, child.schema) &&
children.find(p => WholeStageCodegenExec.isTooManyFields(conf, p.schema)).isEmpty
}

override protected def innerChildren: Seq[SparkPlan] = Seq(child)

override def producedAttributes: AttributeSet = outputSet
Expand All @@ -87,6 +118,7 @@ case class InMemoryRelation(

private def buildBuffers(): Unit = {
val output = child.output
val useColumnarBatch = useColumnarBatches
val cached = child.execute().mapPartitionsInternal { rowIterator =>
new Iterator[CachedBatch] {
def next(): CachedBatch = {
Expand Down Expand Up @@ -126,7 +158,8 @@ case class InMemoryRelation(
columnBuilders.flatMap(_.columnStats.collectedStatistics))
CachedBatch(rowCount, columnBuilders.map { builder =>
JavaUtils.bufferToArray(builder.build())
}, stats)
}, stats,
if (useColumnarBatch) StructType.fromAttributes(output) else null)
}

def hasNext: Boolean = rowIterator.hasNext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,37 @@ import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
import org.apache.spark.sql.execution.LeafExecNode
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode}
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector
import org.apache.spark.sql.types.UserDefinedType


case class InMemoryTableScanExec(
attributes: Seq[Attribute],
predicates: Seq[Expression],
@transient relation: InMemoryRelation)
extends LeafExecNode {
extends LeafExecNode with ColumnarBatchScan {

override protected def innerChildren: Seq[QueryPlan[_]] = Seq(relation) ++ super.innerChildren

override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
override def vectorTypes: Option[Seq[String]] =
Option(Seq.fill(attributes.length)(classOf[OnHeapColumnVector].getName))

override val columnIndexes =
attributes.map(a => relation.output.map(o => o.exprId).indexOf(a.exprId)).toArray

override val supportCodegen: Boolean = relation.useColumnarBatches

override def inputRDDs(): Seq[RDD[InternalRow]] = {
if (supportCodegen) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If supportCodegen is false, I think we never call inputRDDs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks. I will insert assertion

val buffers = relation.cachedColumnBuffers
// HACK ALERT: This is actually an RDD[CachedBatch].
// We're taking advantage of Scala's type erasure here to pass these batches along.
Seq(buffers.asInstanceOf[RDD[InternalRow]])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ColumnarBatchScan assumes the input RDD is RDD[ColumnarBatch], you are breaking this assumption here.

I think we should convert CachedBatch to ColumnarBatch first, you can codegen a class to do it if necessary.

Copy link
Member Author

@kiszk kiszk Oct 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I break that assumption (RDD[CachedBatch]) since we have to create ColumnarBatch when it will be read.
Should we convert CachedBatch to ColumnarBatch here in inputRDDs()?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes please

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile sure, done

} else {
Seq()
}
}

override def output: Seq[Attribute] = attributes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,27 @@ class DataFrameTungstenSuite extends QueryTest with SharedSQLContext {
val df = spark.createDataFrame(data, schema)
assert(df.select("b").first() === Row(outerStruct))
}

test("primitive data type accesses in persist data") {
val data = Seq(true, 1.toByte, 3.toShort, 7, 15.toLong,
31.25.toFloat, 63.75, null)
val dataTypes = Seq(BooleanType, ByteType, ShortType, IntegerType, LongType,
FloatType, DoubleType, IntegerType)
val schemas = dataTypes.zipWithIndex.map { case (dataType, index) =>
StructField(s"col$index", dataType, true)
}
val rdd = sparkContext.makeRDD(Seq(Row.fromSeq(data)))
val df = spark.createDataFrame(rdd, StructType(schemas))
val row = df.persist.take(1).apply(0)
checkAnswer(df, row)
}

test("access cache multiple times") {
val df = sparkContext.parallelize(Seq(1, 2, 3), 1).toDF("x").cache
df.count
val df1 = df.filter("x > 1")
checkAnswer(df1, Seq(Row(2), Row(3)))
val df2 = df.filter("x > 2")
checkAnswer(df2, Row(3))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution
import org.apache.spark.sql.{QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.expressions.codegen.{CodeAndComment, CodeGenerator}
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
import org.apache.spark.sql.execution.joins.SortMergeJoinExec
import org.apache.spark.sql.expressions.scalalang.typed
Expand Down Expand Up @@ -117,6 +118,37 @@ class WholeStageCodegenSuite extends QueryTest with SharedSQLContext {
assert(ds.collect() === Array(("a", 10.0), ("b", 3.0), ("c", 1.0)))
}

test("cache for primitive type should be in WholeStageCodegen with InMemoryTableScanExec") {
import testImplicits._

val dsInt = spark.range(3).cache
dsInt.count
val dsIntFilter = dsInt.filter(_ > 0)
val planInt = dsIntFilter.queryExecution.executedPlan
assert(planInt.find(p =>
p.isInstanceOf[WholeStageCodegenExec] &&
p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[FilterExec] &&
p.asInstanceOf[WholeStageCodegenExec].child.asInstanceOf[FilterExec].child
.isInstanceOf[InMemoryTableScanExec] &&
p.asInstanceOf[WholeStageCodegenExec].child.asInstanceOf[FilterExec].child
.asInstanceOf[InMemoryTableScanExec].supportCodegen).isDefined
)
assert(dsIntFilter.collect() === Array(1, 2))

// cache for string type is not supported for InMemoryTableScanExec
val dsString = spark.range(3).map(_.toString).cache
dsString.count
val dsStringFilter = dsString.filter(_ == "1")
val planString = dsStringFilter.queryExecution.executedPlan
assert(planString.find(p =>
p.isInstanceOf[WholeStageCodegenExec] &&
p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[FilterExec] &&
!p.asInstanceOf[WholeStageCodegenExec].child.asInstanceOf[FilterExec].child
.isInstanceOf[InMemoryTableScanExec]).isDefined
)
assert(dsStringFilter.collect() === Array("1"))
}

test("SPARK-19512 codegen for comparing structs is incorrect") {
// this would raise CompileException before the fix
spark.range(10)
Expand Down