Skip to content

Commit d7c0e8f

Browse files
committed
Added test suite for IntegralDelta (IntDelta & LongDelta)
1 parent 3c1ad7a commit d7c0e8f

File tree

2 files changed

+118
-0
lines changed

2 files changed

+118
-0
lines changed

sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,9 @@ private[sql] sealed abstract class IntegralDelta[I <: IntegralType] extends Comp
377377
*/
378378
protected def byteSizedDelta(x: I#JvmType, y: I#JvmType): (Boolean, Byte)
379379

380+
/**
381+
* Simply computes `x + delta`
382+
*/
380383
protected def addDelta(x: I#JvmType, delta: Byte): I#JvmType
381384

382385
class Encoder extends compression.Encoder[I] {
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.columnar.compression
19+
20+
import org.scalatest.FunSuite
21+
22+
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
23+
import org.apache.spark.sql.catalyst.types.IntegralType
24+
import org.apache.spark.sql.columnar._
25+
26+
class IntegralDeltaSuite extends FunSuite {
27+
testIntegralDelta(new IntColumnStats, INT, IntDelta)
28+
testIntegralDelta(new LongColumnStats, LONG, LongDelta)
29+
30+
def testIntegralDelta[I <: IntegralType](
31+
columnStats: NativeColumnStats[I],
32+
columnType: NativeColumnType[I],
33+
scheme: IntegralDelta[I]) {
34+
35+
def skeleton(input: Seq[I#JvmType]) {
36+
// -------------
37+
// Tests encoder
38+
// -------------
39+
40+
val builder = TestCompressibleColumnBuilder(columnStats, columnType, scheme)
41+
val deltas = if (input.isEmpty) {
42+
Seq.empty[Long]
43+
} else {
44+
(input.tail, input.init).zipped.map {
45+
case (x: Int, y: Int) => (x - y).toLong
46+
case (x: Long, y: Long) => x - y
47+
}
48+
}
49+
50+
input.map { value =>
51+
val row = new GenericMutableRow(1)
52+
columnType.setField(row, 0, value)
53+
builder.appendFrom(row, 0)
54+
}
55+
56+
val buffer = builder.build()
57+
// Column type ID + null count + null positions
58+
val headerSize = CompressionScheme.columnHeaderSize(buffer)
59+
60+
// Compression scheme ID + compressed contents
61+
val compressedSize = 4 + (if (deltas.isEmpty) {
62+
0
63+
} else {
64+
val oneBoolean = columnType.defaultSize
65+
1 + oneBoolean + deltas.map {
66+
d => if (math.abs(d) < Byte.MaxValue) 1 else 1 + oneBoolean
67+
}.sum
68+
})
69+
70+
// 4 extra bytes for compression scheme type ID
71+
expectResult(headerSize + compressedSize, "Wrong buffer capacity")(buffer.capacity)
72+
73+
buffer.position(headerSize)
74+
expectResult(scheme.typeId, "Wrong compression scheme ID")(buffer.getInt())
75+
76+
if (input.nonEmpty) {
77+
expectResult(Byte.MinValue, "The first byte should be an escaping mark")(buffer.get())
78+
expectResult(input.head, "The first value is wrong")(columnType.extract(buffer))
79+
80+
(input.tail, deltas).zipped.foreach { (value, delta) =>
81+
if (delta < Byte.MaxValue) {
82+
expectResult(delta, "Wrong delta")(buffer.get())
83+
} else {
84+
expectResult(Byte.MinValue, "Expecting escaping mark here")(buffer.get())
85+
expectResult(value, "Wrong value")(columnType.extract(buffer))
86+
}
87+
}
88+
}
89+
90+
// -------------
91+
// Tests decoder
92+
// -------------
93+
94+
// Rewinds, skips column header and 4 more bytes for compression scheme ID
95+
buffer.rewind().position(headerSize + 4)
96+
97+
val decoder = scheme.decoder(buffer, columnType)
98+
input.foreach(expectResult(_, "Wrong decoded value")(decoder.next()))
99+
assert(!decoder.hasNext)
100+
}
101+
102+
test(s"$scheme: empty column") {
103+
skeleton(Seq.empty)
104+
}
105+
106+
test(s"$scheme: simple case") {
107+
val input = columnType match {
108+
case INT => Seq(1: Int, 2: Int, 130: Int)
109+
case LONG => Seq(1: Long, 2: Long, 130: Long)
110+
}
111+
112+
skeleton(input.map(_.asInstanceOf[I#JvmType]))
113+
}
114+
}
115+
}

0 commit comments

Comments
 (0)