Skip to content

Commit f2a7890

Browse files
committed
Use SpecificMutableRow in InMemoryColumnarTableScan to avoid boxing
1 parent 9cf30b0 commit f2a7890

File tree

1 file changed

+19
-10
lines changed

1 file changed

+19
-10
lines changed

sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ private[sql] case class InMemoryRelation(
5252
// As in Spark, the actual work of caching is lazy.
5353
if (_cachedColumnBuffers == null) {
5454
val output = child.output
55-
val cached = child.execute().mapPartitions { baseIterator =>
55+
val cached = child.execute().mapPartitions { rowIterator =>
5656
new Iterator[CachedBatch] {
5757
def next() = {
5858
val columnBuilders = output.map { attribute =>
@@ -64,8 +64,8 @@ private[sql] case class InMemoryRelation(
6464
var row: Row = null
6565
var rowCount = 0
6666

67-
while (baseIterator.hasNext && rowCount < batchSize) {
68-
row = baseIterator.next()
67+
while (rowIterator.hasNext && rowCount < batchSize) {
68+
row = rowIterator.next()
6969
var i = 0
7070
while (i < row.length) {
7171
columnBuilders(i).appendFrom(row, i)
@@ -80,7 +80,7 @@ private[sql] case class InMemoryRelation(
8080
CachedBatch(columnBuilders.map(_.build()), stats)
8181
}
8282

83-
def hasNext = baseIterator.hasNext
83+
def hasNext = rowIterator.hasNext
8484
}
8585
}.cache()
8686

@@ -196,11 +196,20 @@ private[sql] case class InMemoryColumnarTableScan(
196196
partitionFilters.reduceOption(And).getOrElse(Literal(true)),
197197
relation.partitionStatistics.schema)
198198

199-
// Find the ordinals of the requested columns. If none are requested, use the first.
200-
val requestedColumns = if (attributes.isEmpty) {
201-
Seq(0)
199+
// Find the ordinals and data types of the requested columns. If none are requested, use the
200+
// narrowest (the field with minimum default element size).
201+
val (requestedColumnIndices, requestedColumnDataTypes) = if (attributes.isEmpty) {
202+
val (narrowestOrdinal, narrowestDataType) =
203+
relation.output.zipWithIndex.map { case (a, ordinal) =>
204+
ordinal -> a.dataType
205+
} minBy { case (_, dataType) =>
206+
ColumnType(dataType).defaultSize
207+
}
208+
Seq(narrowestOrdinal) -> Seq(narrowestDataType)
202209
} else {
203-
attributes.map(a => relation.output.indexWhere(_.exprId == a.exprId))
210+
attributes.map { a =>
211+
relation.output.indexWhere(_.exprId == a.exprId) -> a.dataType
212+
}.unzip
204213
}
205214

206215
val rows = iterator
@@ -220,11 +229,11 @@ private[sql] case class InMemoryColumnarTableScan(
220229
}
221230
// Build column accessors
222231
.map { cachedBatch =>
223-
requestedColumns.map(cachedBatch.buffers(_)).map(ColumnAccessor(_))
232+
requestedColumnIndices.map(cachedBatch.buffers(_)).map(ColumnAccessor(_))
224233
}
225234
// Extract rows via column accessors
226235
.flatMap { columnAccessors =>
227-
val nextRow = new GenericMutableRow(columnAccessors.length)
236+
val nextRow = new SpecificMutableRow(requestedColumnDataTypes)
228237
new Iterator[Row] {
229238
override def next() = {
230239
var i = 0

0 commit comments

Comments
 (0)