Skip to content

Commit 7dbca68

Browse files
lianchengpwendell
authored andcommitted
[BUGFIX] In-memory columnar storage bug fixes
Fixed several bugs of in-memory columnar storage to make `HiveInMemoryCompatibilitySuite` pass. @rxin @marmbrus It is reasonable to include `HiveInMemoryCompatibilitySuite` in this PR, but I didn't, since it significantly increases test execution time. What do you think? **UPDATE** `HiveCompatibilitySuite` has been made to cache tables in memory. `HiveInMemoryCompatibilitySuite` was removed. Author: Cheng Lian <[email protected]> Author: Michael Armbrust <[email protected]> Closes #374 from liancheng/inMemBugFix and squashes the following commits: 6ad6d9b [Cheng Lian] Merged HiveCompatibilitySuite and HiveInMemoryCompatibilitySuite 5bdbfe7 [Cheng Lian] Revert 882c538 & 8426ddc, which introduced regression 882c538 [Cheng Lian] Remove attributes field from InMemoryColumnarTableScan 32cc9ce [Cheng Lian] Code style cleanup 99382bf [Cheng Lian] Enable compression by default 4390bcc [Cheng Lian] Report error for any Throwable in HiveComparisonTest d1df4fd [Michael Armbrust] Remove test tables that might always get created anyway? ab9e807 [Michael Armbrust] Fix the logged console version of failed test cases to use the new syntax. 1965123 [Michael Armbrust] Don't use coalesce for gathering all data to a single partition, as it does not work correctly with mutable rows. e36cdd0 [Michael Armbrust] Spelling. 2d0e168 [Michael Armbrust] Run Hive tests in-memory too. 6360723 [Cheng Lian] Made PreInsertionCasts support SparkLogicalPlan and InMemoryColumnarTableScan c9b0f6f [Cheng Lian] Let InsertIntoTable support InMemoryColumnarTableScan 9c8fc40 [Cheng Lian] Disable compression by default e619995 [Cheng Lian] Bug fix: incorrect byte order in CompressionScheme.columnHeaderSize 8426ddc [Cheng Lian] Bug fix: InMemoryColumnarTableScan should cache columns specified by the attributes argument 036cd09 [Cheng Lian] Clean up unused imports 44591a5 [Cheng Lian] Bug fix: NullableColumnAccessor.hasNext must take nulls into account 052bf41 [Cheng Lian] Bug fix: should only gather compressibility info for non-null values 95b3301 [Cheng Lian] Fixed bugs in IntegralDelta
1 parent 037fe4d commit 7dbca68

File tree

17 files changed

+109
-66
lines changed

17 files changed

+109
-66
lines changed

sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ class SchemaRDD(
9999
def baseSchemaRDD = this
100100

101101
// =========================================================================================
102-
// RDD functions: Copy the interal row representation so we present immutable data to users.
102+
// RDD functions: Copy the internal row representation so we present immutable data to users.
103103
// =========================================================================================
104104

105105
override def compute(split: Partition, context: TaskContext): Iterator[Row] =

sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,4 +54,6 @@ private[sql] trait NullableColumnAccessor extends ColumnAccessor {
5454

5555
pos += 1
5656
}
57+
58+
abstract override def hasNext = seenNulls < nullCount || super.hasNext
5759
}

sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,9 @@ private[sql] trait CompressibleColumnBuilder[T <: NativeType]
6565

6666
abstract override def appendFrom(row: Row, ordinal: Int) {
6767
super.appendFrom(row, ordinal)
68-
gatherCompressibilityStats(row, ordinal)
68+
if (!row.isNullAt(ordinal)) {
69+
gatherCompressibilityStats(row, ordinal)
70+
}
6971
}
7072

7173
abstract override def build() = {

sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.sql.columnar.compression
1919

20-
import java.nio.ByteBuffer
20+
import java.nio.{ByteOrder, ByteBuffer}
2121

2222
import org.apache.spark.sql.catalyst.types.NativeType
2323
import org.apache.spark.sql.columnar.{ColumnType, NativeColumnType}
@@ -84,7 +84,7 @@ private[sql] object CompressionScheme {
8484
}
8585

8686
def columnHeaderSize(columnBuffer: ByteBuffer): Int = {
87-
val header = columnBuffer.duplicate()
87+
val header = columnBuffer.duplicate().order(ByteOrder.nativeOrder)
8888
val nullCount = header.getInt(4)
8989
// Column type ID + null count + null positions
9090
4 + 4 + 4 * nullCount

sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -397,26 +397,27 @@ private[sql] sealed abstract class IntegralDelta[I <: IntegralType] extends Comp
397397

398398
if (initial) {
399399
initial = false
400-
prev = value
401400
_compressedSize += 1 + columnType.defaultSize
402401
} else {
403402
val (smallEnough, _) = byteSizedDelta(value, prev)
404403
_compressedSize += (if (smallEnough) 1 else 1 + columnType.defaultSize)
405404
}
405+
406+
prev = value
406407
}
407408

408409
override def compress(from: ByteBuffer, to: ByteBuffer, columnType: NativeColumnType[I]) = {
409410
to.putInt(typeId)
410411

411412
if (from.hasRemaining) {
412-
val prev = columnType.extract(from)
413-
413+
var prev = columnType.extract(from)
414414
to.put(Byte.MinValue)
415415
columnType.append(prev, to)
416416

417417
while (from.hasRemaining) {
418418
val current = columnType.extract(from)
419419
val (smallEnough, delta) = byteSizedDelta(current, prev)
420+
prev = current
420421

421422
if (smallEnough) {
422423
to.put(delta)
@@ -443,13 +444,8 @@ private[sql] sealed abstract class IntegralDelta[I <: IntegralType] extends Comp
443444

444445
override def next() = {
445446
val delta = buffer.get()
446-
447-
if (delta > Byte.MinValue) {
448-
addDelta(prev, delta)
449-
} else {
450-
prev = columnType.extract(buffer)
451-
prev
452-
}
447+
prev = if (delta > Byte.MinValue) addDelta(prev, delta) else columnType.extract(buffer)
448+
prev
453449
}
454450

455451
override def hasNext = buffer.hasRemaining
@@ -465,7 +461,7 @@ private[sql] case object IntDelta extends IntegralDelta[IntegerType.type] {
465461

466462
override protected def byteSizedDelta(x: Int, y: Int): (Boolean, Byte) = {
467463
val delta = x - y
468-
if (delta < Byte.MaxValue) (true, delta.toByte) else (false, 0: Byte)
464+
if (math.abs(delta) <= Byte.MaxValue) (true, delta.toByte) else (false, 0: Byte)
469465
}
470466
}
471467

@@ -478,6 +474,6 @@ private[sql] case object LongDelta extends IntegralDelta[LongType.type] {
478474

479475
override protected def byteSizedDelta(x: Long, y: Long): (Boolean, Byte) = {
480476
val delta = x - y
481-
if (delta < Byte.MaxValue) (true, delta.toByte) else (false, 0: Byte)
477+
if (math.abs(delta) <= Byte.MaxValue) (true, delta.toByte) else (false, 0: Byte)
482478
}
483479
}

sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,14 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
6161
shuffled.map(_._1)
6262

6363
case SinglePartition =>
64-
child.execute().coalesce(1, shuffle = true)
64+
val rdd = child.execute().mapPartitions { iter =>
65+
val mutablePair = new MutablePair[Null, Row]()
66+
iter.map(r => mutablePair.update(null, r))
67+
}
68+
val partitioner = new HashPartitioner(1)
69+
val shuffled = new ShuffledRDD[Null, Row, MutablePair[Null, Row]](rdd, partitioner)
70+
shuffled.setSerializer(new SparkSqlSerializer(new SparkConf(false)))
71+
shuffled.map(_._2)
6572

6673
case _ => sys.error(s"Exchange not implemented for $newPartitioning")
6774
// TODO: Handle BroadcastPartitioning.

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,8 @@ case class SparkLogicalPlan(alreadyPlanned: SparkPlan)
7070
SparkLogicalPlan(
7171
alreadyPlanned match {
7272
case ExistingRdd(output, rdd) => ExistingRdd(output.map(_.newInstance), rdd)
73-
case InMemoryColumnarTableScan(output, child) =>
74-
InMemoryColumnarTableScan(output.map(_.newInstance), child)
73+
case scan @ InMemoryColumnarTableScan(output, child) =>
74+
scan.copy(attributes = output.map(_.newInstance))
7575
case _ => sys.error("Multiple instance of the same relation detected.")
7676
}).asInstanceOf[this.type]
7777
}

sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,10 @@
1717

1818
package org.apache.spark.sql
1919

20-
import org.scalatest.FunSuite
2120
import org.apache.spark.sql.TestData._
22-
import org.apache.spark.sql.test.TestSQLContext
23-
import org.apache.spark.sql.execution.SparkLogicalPlan
2421
import org.apache.spark.sql.columnar.InMemoryColumnarTableScan
22+
import org.apache.spark.sql.execution.SparkLogicalPlan
23+
import org.apache.spark.sql.test.TestSQLContext
2524

2625
class CachedTableSuite extends QueryTest {
2726
TestData // Load test tables.

sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarQuerySuite.scala renamed to sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,11 @@
1818
package org.apache.spark.sql.columnar
1919

2020
import org.apache.spark.sql.{QueryTest, TestData}
21+
import org.apache.spark.sql.catalyst.expressions.Row
2122
import org.apache.spark.sql.execution.SparkLogicalPlan
2223
import org.apache.spark.sql.test.TestSQLContext
2324

24-
class ColumnarQuerySuite extends QueryTest {
25+
class InMemoryColumnarQuerySuite extends QueryTest {
2526
import TestData._
2627
import TestSQLContext._
2728

@@ -32,6 +33,15 @@ class ColumnarQuerySuite extends QueryTest {
3233
checkAnswer(scan, testData.collect().toSeq)
3334
}
3435

36+
test("projection") {
37+
val plan = TestSQLContext.executePlan(testData.select('value, 'key).logicalPlan).executedPlan
38+
val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan))
39+
40+
checkAnswer(scan, testData.collect().map {
41+
case Row(key: Int, value: String) => value -> key
42+
}.toSeq)
43+
}
44+
3545
test("SPARK-1436 regression: in-memory columns must be able to be accessed multiple times") {
3646
val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan
3747
val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan))

sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,12 +68,16 @@ class NullableColumnAccessorSuite extends FunSuite {
6868
val row = new GenericMutableRow(1)
6969

7070
(0 until 4).foreach { _ =>
71+
assert(accessor.hasNext)
7172
accessor.extractTo(row, 0)
7273
assert(row(0) === randomRow(0))
7374

75+
assert(accessor.hasNext)
7476
accessor.extractTo(row, 0)
7577
assert(row.isNullAt(0))
7678
}
79+
80+
assert(!accessor.hasNext)
7781
}
7882
}
7983
}

0 commit comments

Comments
 (0)