Skip to content

Commit 0d0493f

Browse files
lianchengrxin
authored andcommitted
[SPARK-1402] Added 3 more compression schemes
JIRA issue: [SPARK-1402](https://issues.apache.org/jira/browse/SPARK-1402) This PR provides 3 more compression schemes for Spark SQL in-memory columnar storage: * `BooleanBitSet` * `IntDelta` * `LongDelta` Now there are 6 compression schemes in total, including the no-op `PassThrough` scheme. Also fixed a bug in PR #286: not all compression schemes are added as available schemes when accessing an in-memory column, and when a column is compressed with an unrecognised scheme, `ColumnAccessor` throws exception. Author: Cheng Lian <[email protected]> Closes #330 from liancheng/moreCompressionSchemes and squashes the following commits: 1d037b8 [Cheng Lian] Fixed SPARK-1436: in-memory column byte buffer must be able to be accessed multiple times d7c0e8f [Cheng Lian] Added test suite for IntegralDelta (IntDelta & LongDelta) 3c1ad7a [Cheng Lian] Added test suite for BooleanBitSet, refactored other test suites 44fe4b2 [Cheng Lian] Refactored CompressionScheme, added 3 more compression schemes.
1 parent f27e56a commit 0d0493f

File tree

11 files changed

+586
-179
lines changed

11 files changed

+586
-179
lines changed

sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -100,20 +100,21 @@ private[sql] class GenericColumnAccessor(buffer: ByteBuffer)
100100

101101
private[sql] object ColumnAccessor {
102102
def apply(buffer: ByteBuffer): ColumnAccessor = {
103+
val dup = buffer.duplicate().order(ByteOrder.nativeOrder)
103104
// The first 4 bytes in the buffer indicate the column type.
104-
val columnTypeId = buffer.getInt()
105+
val columnTypeId = dup.getInt()
105106

106107
columnTypeId match {
107-
case INT.typeId => new IntColumnAccessor(buffer)
108-
case LONG.typeId => new LongColumnAccessor(buffer)
109-
case FLOAT.typeId => new FloatColumnAccessor(buffer)
110-
case DOUBLE.typeId => new DoubleColumnAccessor(buffer)
111-
case BOOLEAN.typeId => new BooleanColumnAccessor(buffer)
112-
case BYTE.typeId => new ByteColumnAccessor(buffer)
113-
case SHORT.typeId => new ShortColumnAccessor(buffer)
114-
case STRING.typeId => new StringColumnAccessor(buffer)
115-
case BINARY.typeId => new BinaryColumnAccessor(buffer)
116-
case GENERIC.typeId => new GenericColumnAccessor(buffer)
108+
case INT.typeId => new IntColumnAccessor(dup)
109+
case LONG.typeId => new LongColumnAccessor(dup)
110+
case FLOAT.typeId => new FloatColumnAccessor(dup)
111+
case DOUBLE.typeId => new DoubleColumnAccessor(dup)
112+
case BOOLEAN.typeId => new BooleanColumnAccessor(dup)
113+
case BYTE.typeId => new ByteColumnAccessor(dup)
114+
case SHORT.typeId => new ShortColumnAccessor(dup)
115+
case STRING.typeId => new StringColumnAccessor(dup)
116+
case BINARY.typeId => new BinaryColumnAccessor(dup)
117+
case GENERIC.typeId => new GenericColumnAccessor(dup)
117118
}
118119
}
119120
}

sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,12 @@ package org.apache.spark.sql.columnar
2020
import org.apache.spark.sql.Row
2121
import org.apache.spark.sql.catalyst.types._
2222

23+
/**
24+
* Used to collect statistical information when building in-memory columns.
25+
*
26+
* NOTE: we intentionally avoid using `Ordering[T]` to compare values here because `Ordering[T]`
27+
* brings significant performance penalty.
28+
*/
2329
private[sql] sealed abstract class ColumnStats[T <: DataType, JvmType] extends Serializable {
2430
/**
2531
* Closed lower bound of this column.

sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,9 @@ private[sql] trait CompressibleColumnBuilder[T <: NativeType]
4747

4848
import CompressionScheme._
4949

50-
val compressionEncoders = schemes.filter(_.supports(columnType)).map(_.encoder)
50+
val compressionEncoders = schemes.filter(_.supports(columnType)).map(_.encoder[T])
5151

52-
protected def isWorthCompressing(encoder: Encoder) = {
52+
protected def isWorthCompressing(encoder: Encoder[T]) = {
5353
encoder.compressionRatio < 0.8
5454
}
5555

@@ -70,7 +70,7 @@ private[sql] trait CompressibleColumnBuilder[T <: NativeType]
7070

7171
abstract override def build() = {
7272
val rawBuffer = super.build()
73-
val encoder = {
73+
val encoder: Encoder[T] = {
7474
val candidate = compressionEncoders.minBy(_.compressionRatio)
7575
if (isWorthCompressing(candidate)) candidate else PassThrough.encoder
7676
}

sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,8 @@ import java.nio.ByteBuffer
2222
import org.apache.spark.sql.catalyst.types.NativeType
2323
import org.apache.spark.sql.columnar.{ColumnType, NativeColumnType}
2424

25-
private[sql] trait Encoder {
26-
def gatherCompressibilityStats[T <: NativeType](
27-
value: T#JvmType,
28-
columnType: ColumnType[T, T#JvmType]) {}
25+
private[sql] trait Encoder[T <: NativeType] {
26+
def gatherCompressibilityStats(value: T#JvmType, columnType: NativeColumnType[T]) {}
2927

3028
def compressedSize: Int
3129

@@ -35,10 +33,7 @@ private[sql] trait Encoder {
3533
if (uncompressedSize > 0) compressedSize.toDouble / uncompressedSize else 1.0
3634
}
3735

38-
def compress[T <: NativeType](
39-
from: ByteBuffer,
40-
to: ByteBuffer,
41-
columnType: ColumnType[T, T#JvmType]): ByteBuffer
36+
def compress(from: ByteBuffer, to: ByteBuffer, columnType: NativeColumnType[T]): ByteBuffer
4237
}
4338

4439
private[sql] trait Decoder[T <: NativeType] extends Iterator[T#JvmType]
@@ -48,7 +43,7 @@ private[sql] trait CompressionScheme {
4843

4944
def supports(columnType: ColumnType[_, _]): Boolean
5045

51-
def encoder: Encoder
46+
def encoder[T <: NativeType]: Encoder[T]
5247

5348
def decoder[T <: NativeType](buffer: ByteBuffer, columnType: NativeColumnType[T]): Decoder[T]
5449
}
@@ -58,15 +53,18 @@ private[sql] trait WithCompressionSchemes {
5853
}
5954

6055
private[sql] trait AllCompressionSchemes extends WithCompressionSchemes {
61-
override val schemes: Seq[CompressionScheme] = {
62-
Seq(PassThrough, RunLengthEncoding, DictionaryEncoding)
63-
}
56+
override val schemes: Seq[CompressionScheme] = CompressionScheme.all
6457
}
6558

6659
private[sql] object CompressionScheme {
67-
def apply(typeId: Int): CompressionScheme = typeId match {
68-
case PassThrough.typeId => PassThrough
69-
case _ => throw new UnsupportedOperationException()
60+
val all: Seq[CompressionScheme] =
61+
Seq(PassThrough, RunLengthEncoding, DictionaryEncoding, BooleanBitSet, IntDelta, LongDelta)
62+
63+
private val typeIdToScheme = all.map(scheme => scheme.typeId -> scheme).toMap
64+
65+
def apply(typeId: Int): CompressionScheme = {
66+
typeIdToScheme.getOrElse(typeId, throw new UnsupportedOperationException(
67+
s"Unrecognized compression scheme type ID: $typeId"))
7068
}
7169

7270
def copyColumnHeader(from: ByteBuffer, to: ByteBuffer) {

0 commit comments

Comments
 (0)