diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java index ad267ab0c9c4..8e36d307d11f 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java @@ -284,6 +284,14 @@ public void reset() { */ public abstract void close(); + /** + * Compress or decompress data for this column if possible. + * Note: After calling compress(), gettter/setter (e.g. getInt) do not work + * until decompress() will be called + */ + public abstract void compress(); + public abstract void decompress(); + public void reserve(int requiredCapacity) { if (requiredCapacity > capacity) { int newCapacity = (int) Math.min(MAX_CAPACITY, requiredCapacity * 2L); diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java index a7d3744d00e9..baae64f91de8 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java @@ -74,6 +74,14 @@ public void close() { offsetData = 0; } + @Override + public void compress() { + } + + @Override + public void decompress() { + } + // // APIs dealing with nulls // diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java index 94ed32294cfa..9fb3b4053294 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java @@ -21,6 +21,7 @@ import java.util.Arrays; import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.execution.columnar.compression.ColumnVectorCompressionBuilder; import org.apache.spark.sql.types.*; import org.apache.spark.unsafe.Platform; @@ -30,6 +31,10 @@ */ public final class OnHeapColumnVector extends ColumnVector { + private boolean compressed; + private ColumnVectorCompressionBuilder compressionBuilder; + private ColumnVectorCompressionBuilder nullCompressionBuilder; + private static final boolean bigEndianPlatform = ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN); @@ -38,6 +43,7 @@ public final class OnHeapColumnVector extends ColumnVector { // This is faster than a boolean array and we optimize this over memory footprint. private byte[] nulls; + private byte[] compressedNulls; // Array for each type. Only 1 is populated for any type. private byte[] byteData; @@ -46,6 +52,7 @@ public final class OnHeapColumnVector extends ColumnVector { private long[] longData; private float[] floatData; private double[] doubleData; + private byte[] compressedData; // Only set if type is Array. private int[] arrayLengths; @@ -70,6 +77,81 @@ public long nullsNativeAddress() { public void close() { } + @Override + public void compress() { + if (compressed) return; + + Object inputData = + (type instanceof BooleanType || type instanceof ByteType) ? byteData : + (type instanceof ShortType) ? shortData : + (type instanceof IntegerType) ? intData : + (type instanceof LongType) ? longData : + (type instanceof FloatType) ? floatData : + (type instanceof DoubleType) ? doubleData : null; + if (inputData != null) { + if (compressionBuilder == null) { + compressionBuilder = new ColumnVectorCompressionBuilder((AtomicType) type); + } + byte[] out = compressionBuilder.compress(inputData); + if (out != null) { + compressedData = out; + byteData = null; + shortData = null; + intData = null; + longData = null; + floatData = null; + doubleData = null; + } + } + + if (nulls != null) { + if (nullCompressionBuilder == null) { + nullCompressionBuilder = new ColumnVectorCompressionBuilder(BooleanType$.MODULE$); + } + byte[] out = nullCompressionBuilder.compress(nulls); + if (out != null) { + compressedNulls = out; + nulls = null; + } + } + + compressed = (compressedData != null) || (compressedNulls != null); + } + + @Override + public void decompress() { + if (!compressed) return; + + if (compressedData != null) { + Object outputData = null; + if (type instanceof BooleanType || type instanceof ByteType) { + outputData = byteData = new byte[capacity]; + } else if (type instanceof ShortType) { + outputData = shortData = new short[capacity]; + } else if (type instanceof IntegerType) { + outputData = intData = new int[capacity]; + } else if (type instanceof LongType) { + outputData = longData = new long[capacity]; + } else if (type instanceof FloatType) { + outputData = floatData = new float[capacity]; + } else if (type instanceof DoubleType) { + outputData = doubleData = new double[capacity]; + } + if (outputData != null) { + compressionBuilder.decompress(compressedData, outputData); + compressedData = null; + } + } + + if (compressedNulls != null) { + nulls = new byte[capacity]; + nullCompressionBuilder.decompress(compressedNulls, nulls); + compressedNulls = null; + } + + compressed = false; + } + // // APIs dealing with nulls // diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala index 703bde25316d..9866e8a8e4e5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala @@ -21,6 +21,7 @@ import java.math.{BigDecimal, BigInteger} import java.nio.ByteBuffer import scala.annotation.tailrec +import scala.language.existentials import scala.reflect.runtime.universe.TypeTag import org.apache.spark.sql.catalyst.InternalRow @@ -141,6 +142,18 @@ private[columnar] sealed abstract class ColumnType[JvmType] { def clone(v: JvmType): JvmType = v override def toString: String = getClass.getSimpleName.stripSuffix("$") + + /** + * Extracts a value out of the buffer at the ArrayBuffer's current position + * Subclasses should override this method to avoid boxing/unboxing costs whenever possible. + */ + def get(buffer: ArrayBuffer): JvmType + + /** + * Store value of type T#InternalType into the given ArrayBuffer at the current position + * Subclasses should override this method to avoid boxing/unboxing costs whenever possible. + */ + def put(buffer: ArrayBuffer, value: JvmType): Unit } private[columnar] object NULL extends ColumnType[Any] { @@ -151,6 +164,8 @@ private[columnar] object NULL extends ColumnType[Any] { override def extract(buffer: ByteBuffer): Any = null override def setField(row: InternalRow, ordinal: Int, value: Any): Unit = row.setNullAt(ordinal) override def getField(row: InternalRow, ordinal: Int): Any = null + override def get(buffer: ArrayBuffer): Any = null + override def put(buffer: ArrayBuffer, value: Any): Unit = {} } private[columnar] abstract class NativeColumnType[T <: AtomicType]( @@ -191,6 +206,21 @@ private[columnar] object INT extends NativeColumnType(IntegerType, 4) { override def copyField(from: InternalRow, fromOrdinal: Int, to: InternalRow, toOrdinal: Int) { to.setInt(toOrdinal, from.getInt(fromOrdinal)) } + + override def get(buffer: ArrayBuffer): Int = { + val array = buffer.array + assert(buffer.pos < array.length * defaultSize) + val value = Platform.getInt(array, buffer.arrayOffset + buffer.pos) + buffer.addPos(defaultSize) + value + } + + override def put(buffer: ArrayBuffer, value: Int): Unit = { + val array = buffer.array + assert(buffer.pos < array.length * defaultSize) + Platform.putInt(array, buffer.arrayOffset + buffer.pos, value) + buffer.addPos(defaultSize) + } } private[columnar] object LONG extends NativeColumnType(LongType, 8) { @@ -219,6 +249,21 @@ private[columnar] object LONG extends NativeColumnType(LongType, 8) { override def copyField(from: InternalRow, fromOrdinal: Int, to: InternalRow, toOrdinal: Int) { to.setLong(toOrdinal, from.getLong(fromOrdinal)) } + + override def get(buffer: ArrayBuffer): Long = { + val array = buffer.array + assert(buffer.pos < array.length * defaultSize) + val value = Platform.getLong(array, buffer.arrayOffset + buffer.pos) + buffer.addPos(defaultSize) + value + } + + override def put(buffer: ArrayBuffer, value: Long): Unit = { + val array = buffer.array + assert(buffer.pos < array.length * defaultSize) + Platform.putLong(array, buffer.arrayOffset + buffer.pos, value) + buffer.addPos(defaultSize) + } } private[columnar] object FLOAT extends NativeColumnType(FloatType, 4) { @@ -247,6 +292,14 @@ private[columnar] object FLOAT extends NativeColumnType(FloatType, 4) { override def copyField(from: InternalRow, fromOrdinal: Int, to: InternalRow, toOrdinal: Int) { to.setFloat(toOrdinal, from.getFloat(fromOrdinal)) } + + override def get(buffer: ArrayBuffer): Float = { + throw new UnsupportedOperationException("Float is not supported yet") + } + + override def put(buffer: ArrayBuffer, value: Float): Unit = { + throw new UnsupportedOperationException("Float is not supported yet") + } } private[columnar] object DOUBLE extends NativeColumnType(DoubleType, 8) { @@ -275,6 +328,14 @@ private[columnar] object DOUBLE extends NativeColumnType(DoubleType, 8) { override def copyField(from: InternalRow, fromOrdinal: Int, to: InternalRow, toOrdinal: Int) { to.setDouble(toOrdinal, from.getDouble(fromOrdinal)) } + + override def get(buffer: ArrayBuffer): Double = { + throw new UnsupportedOperationException("Double is not supported yet") + } + + override def put(buffer: ArrayBuffer, value: Double): Unit = { + throw new UnsupportedOperationException("Double is not supported yet") + } } private[columnar] object BOOLEAN extends NativeColumnType(BooleanType, 1) { @@ -301,6 +362,21 @@ private[columnar] object BOOLEAN extends NativeColumnType(BooleanType, 1) { override def copyField(from: InternalRow, fromOrdinal: Int, to: InternalRow, toOrdinal: Int) { to.setBoolean(toOrdinal, from.getBoolean(fromOrdinal)) } + + override def get(buffer: ArrayBuffer): Boolean = { + val array = buffer.array + assert(buffer.pos < array.length) + val value = Platform.getByte(array, buffer.arrayOffset + buffer.pos) + buffer.addPos(defaultSize) + value == 1 + } + + override def put(buffer: ArrayBuffer, value: Boolean): Unit = { + val array = buffer.array + assert(buffer.pos < array.length * defaultSize) + Platform.putByte(array, buffer.arrayOffset + buffer.pos, if (value) 1 else 0) + buffer.addPos(defaultSize) + } } private[columnar] object BYTE extends NativeColumnType(ByteType, 1) { @@ -329,6 +405,21 @@ private[columnar] object BYTE extends NativeColumnType(ByteType, 1) { override def copyField(from: InternalRow, fromOrdinal: Int, to: InternalRow, toOrdinal: Int) { to.setByte(toOrdinal, from.getByte(fromOrdinal)) } + + override def get(buffer: ArrayBuffer): Byte = { + val array = buffer.array + assert(buffer.pos < array.length * defaultSize) + val value = Platform.getByte(array, buffer.arrayOffset + buffer.pos) + buffer.addPos(defaultSize) + value + } + + override def put(buffer: ArrayBuffer, value: Byte): Unit = { + val array = buffer.array + assert(buffer.pos < array.length * defaultSize) + Platform.putByte(array, buffer.arrayOffset + buffer.pos, value) + buffer.addPos(defaultSize) + } } private[columnar] object SHORT extends NativeColumnType(ShortType, 2) { @@ -357,6 +448,21 @@ private[columnar] object SHORT extends NativeColumnType(ShortType, 2) { override def copyField(from: InternalRow, fromOrdinal: Int, to: InternalRow, toOrdinal: Int) { to.setShort(toOrdinal, from.getShort(fromOrdinal)) } + + override def get(buffer: ArrayBuffer): Short = { + val array = buffer.array + assert(buffer.pos < array.length * defaultSize) + val value = Platform.getShort(array, buffer.arrayOffset + buffer.pos) + buffer.addPos(defaultSize) + value + } + + override def put(buffer: ArrayBuffer, value: Short): Unit = { + val array = buffer.array + assert(buffer.pos < array.length * defaultSize) + Platform.putShort(array, buffer.arrayOffset + buffer.pos, value) + buffer.addPos(defaultSize) + } } /** @@ -424,6 +530,14 @@ private[columnar] object STRING } override def clone(v: UTF8String): UTF8String = v.clone() + + override def get(buffer: ArrayBuffer): UTF8String = { + throw new UnsupportedOperationException("UTF8String is not supported yet") + } + + override def put(buffer: ArrayBuffer, value: UTF8String): Unit = { + throw new UnsupportedOperationException("UTF8String is not supported yet") + } } private[columnar] case class COMPACT_DECIMAL(precision: Int, scale: Int) @@ -466,6 +580,14 @@ private[columnar] case class COMPACT_DECIMAL(precision: Int, scale: Int) override def copyField(from: InternalRow, fromOrdinal: Int, to: InternalRow, toOrdinal: Int) { setField(to, toOrdinal, getField(from, fromOrdinal)) } + + override def get(buffer: ArrayBuffer): Decimal = { + throw new UnsupportedOperationException("Decimal is not supported yet") + } + + override def put(buffer: ArrayBuffer, value: Decimal): Unit = { + throw new UnsupportedOperationException("Decimal is not supported yet") + } } private[columnar] object COMPACT_DECIMAL { @@ -509,6 +631,14 @@ private[columnar] object BINARY extends ByteArrayColumnType[Array[Byte]](16) { row.getBinary(ordinal).length + 4 } + override def get(buffer: ArrayBuffer): Array[Byte] = { + throw new UnsupportedOperationException("Binary is not supported yet") + } + + override def put(buffer: ArrayBuffer, value: Array[Byte]): Unit = { + throw new UnsupportedOperationException("Binary is not supported yet") + } + def serialize(value: Array[Byte]): Array[Byte] = value def deserialize(bytes: Array[Byte]): Array[Byte] = bytes } @@ -530,6 +660,14 @@ private[columnar] case class LARGE_DECIMAL(precision: Int, scale: Int) 4 + getField(row, ordinal).toJavaBigDecimal.unscaledValue().bitLength() / 8 + 1 } + override def get(buffer: ArrayBuffer): Decimal = { + throw new UnsupportedOperationException("Decimal is not supported yet") + } + + override def put(buffer: ArrayBuffer, value: Decimal): Unit = { + throw new UnsupportedOperationException("Decimal is not supported yet") + } + override def serialize(value: Decimal): Array[Byte] = { value.toJavaBigDecimal.unscaledValue().toByteArray } @@ -565,6 +703,14 @@ private[columnar] case class STRUCT(dataType: StructType) 4 + getField(row, ordinal).getSizeInBytes } + override def get(buffer: ArrayBuffer): UnsafeRow = { + throw new UnsupportedOperationException("Struct is not supported yet") + } + + override def put(buffer: ArrayBuffer, value: UnsafeRow): Unit = { + throw new UnsupportedOperationException("Struct is not supported yet") + } + override def append(value: UnsafeRow, buffer: ByteBuffer): Unit = { buffer.putInt(value.getSizeInBytes) value.writeTo(buffer) @@ -604,6 +750,14 @@ private[columnar] case class ARRAY(dataType: ArrayType) 4 + unsafeArray.getSizeInBytes } + override def get(buffer: ArrayBuffer): UnsafeArrayData = { + throw new UnsupportedOperationException("Array is not supported yet") + } + + override def put(buffer: ArrayBuffer, value: UnsafeArrayData): Unit = { + throw new UnsupportedOperationException("Array is not supported yet") + } + override def append(value: UnsafeArrayData, buffer: ByteBuffer): Unit = { buffer.putInt(value.getSizeInBytes) value.writeTo(buffer) @@ -643,6 +797,14 @@ private[columnar] case class MAP(dataType: MapType) 4 + unsafeMap.getSizeInBytes } + override def get(buffer: ArrayBuffer): UnsafeMapData = { + throw new UnsupportedOperationException("Map is not supported yet") + } + + override def put(buffer: ArrayBuffer, value: UnsafeMapData): Unit = { + throw new UnsupportedOperationException("Map is not supported yet") + } + override def append(value: UnsafeMapData, buffer: ByteBuffer): Unit = { buffer.putInt(value.getSizeInBytes) value.writeTo(buffer) @@ -688,3 +850,52 @@ private[columnar] object ColumnType { } } } + +case class ArrayBuffer(array: Array[_]) { + val (arrayOffset, elementSize) = array match { + case _: Array[Boolean] => (Platform.BOOLEAN_ARRAY_OFFSET, 1) + case _: Array[Byte] => (Platform.BYTE_ARRAY_OFFSET, 1) + case _: Array[Short] => (Platform.SHORT_ARRAY_OFFSET, 2) + case _: Array[Int] => (Platform.INT_ARRAY_OFFSET, 4) + case _: Array[Long] => (Platform.LONG_ARRAY_OFFSET, 8) + case _: Array[Float] => (Platform.FLOAT_ARRAY_OFFSET, 4) + case _: Array[Double] => (Platform.DOUBLE_ARRAY_OFFSET, 8) + case other => throw new UnsupportedOperationException(s"unsupported type: $other") + } + + private var _pos: Int = 0 + + def pos(): Int = { _pos } + + def addPos(value: Int): Unit = { _pos += value } + + def remaining(): Int = { array.length * elementSize - _pos} + + def hasRemaining(): Boolean = { remaining > 0 } + + def getInt(): Int = { + assert(_pos < array.length * elementSize) + val value = Platform.getInt(array, arrayOffset + _pos) + _pos += 4 + value + } + + def getLong(): Long = { + assert(_pos < array.length * elementSize) + val value = Platform.getLong(array, arrayOffset + _pos) + _pos += 8 + value + } + + def putInt(value: Int): Unit = { + assert(_pos < array.length * elementSize) + Platform.putInt(array, arrayOffset + _pos, value) + _pos += 4 + } + + def putLong(value: Long): Unit = { + assert(_pos < array.length * elementSize) + Platform.putLong(array, arrayOffset + _pos, value) + _pos += 8 + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnBuilder.scala index d1fece05a841..206daf26b151 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnBuilder.scala @@ -21,8 +21,8 @@ import java.nio.{ByteBuffer, ByteOrder} import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.execution.columnar.{ColumnBuilder, NativeColumnBuilder} -import org.apache.spark.sql.types.AtomicType +import org.apache.spark.sql.execution.columnar._ +import org.apache.spark.sql.types._ import org.apache.spark.unsafe.Platform /** @@ -112,3 +112,53 @@ private[columnar] trait CompressibleColumnBuilder[T <: AtomicType] private[columnar] object CompressibleColumnBuilder { val unaligned = Platform.unaligned() } + +class ColumnVectorCompressionBuilder[T <: AtomicType](dataType: T) { + protected def isWorthCompressing(encoder: Encoder[T]) = { + CompressibleColumnBuilder.unaligned && encoder.compressionRatio < 0.8 + } + + val columnType : NativeColumnType[T] = { + val n = dataType match { + case _: BooleanType => BOOLEAN + case _: ByteType => BYTE + case _: ShortType => SHORT + case _: IntegerType | _: DateType => INT + case _: LongType | _: TimestampType => LONG + case _: FloatType => FLOAT + case _: DoubleType => DOUBLE + case other => throw new Exception(s"not supported type: $other") + } + n.asInstanceOf[NativeColumnType[T]] + } + + def compress(in: Array[_]): Array[Byte] = { + // val schemes = CompressionScheme.all + val schemes = Seq(RunLengthEncoding, BooleanBitSet) + val compressionEncoders = + schemes.filter(_.supports(columnType)).map(_.encoder[T](columnType)) + + compressionEncoders.foreach(_.gatherCompressibilityStats(ArrayBuffer(in))) + + val encoder: Encoder[T] = { + val candidate = compressionEncoders.minBy(_.compressionRatio) + if (isWorthCompressing(candidate)) candidate else null + } + + if (encoder != null) { + print(s"Compressor: $encoder, ratio: ${encoder.compressionRatio}\n") + val size = encoder.compressedSize + val compressedBuffer = new Array[Byte](4 + size) + encoder.compress(ArrayBuffer(in), ArrayBuffer(compressedBuffer)) + compressedBuffer + } else { + null + } + } + + def decompress(in: Array[Byte], out: Array[_]): Unit = { + CompressionScheme(Platform.getInt(in, Platform.BYTE_ARRAY_OFFSET)) + .decoder[T](ByteBuffer.wrap(in), columnType) + .decompress(ArrayBuffer(out)) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressionScheme.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressionScheme.scala index 6e4f1c5b8068..13c64c92606a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressionScheme.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressionScheme.scala @@ -20,12 +20,14 @@ package org.apache.spark.sql.execution.columnar.compression import java.nio.{ByteBuffer, ByteOrder} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.execution.columnar.{ColumnType, NativeColumnType} +import org.apache.spark.sql.execution.columnar.{ArrayBuffer, ColumnType, NativeColumnType} import org.apache.spark.sql.types.AtomicType private[columnar] trait Encoder[T <: AtomicType] { def gatherCompressibilityStats(row: InternalRow, ordinal: Int): Unit = {} + def gatherCompressibilityStats(in: ArrayBuffer): Unit = { } + def compressedSize: Int def uncompressedSize: Int @@ -35,12 +37,16 @@ private[columnar] trait Encoder[T <: AtomicType] { } def compress(from: ByteBuffer, to: ByteBuffer): ByteBuffer + + def compress(from: ArrayBuffer, to: ArrayBuffer): Unit } private[columnar] trait Decoder[T <: AtomicType] { def next(row: InternalRow, ordinal: Int): Unit def hasNext: Boolean + + def decompress(values: ArrayBuffer): Unit } private[columnar] trait CompressionScheme { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala index ee99c90a751d..f513ef0a89d6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala @@ -51,6 +51,10 @@ private[columnar] case object PassThrough extends CompressionScheme { to.putInt(PassThrough.typeId).put(from).rewind() to } + + override def compress(from: ArrayBuffer, to: ArrayBuffer): Unit = { + throw new IllegalStateException("Not support in PassThru (should not be called)") + } } class Decoder[T <: AtomicType](buffer: ByteBuffer, columnType: NativeColumnType[T]) @@ -61,6 +65,10 @@ private[columnar] case object PassThrough extends CompressionScheme { } override def hasNext: Boolean = buffer.hasRemaining + + override def decompress(values: ArrayBuffer): Unit = { + throw new IllegalStateException("Not support in PassThru (should not be called)") + } } } @@ -113,6 +121,24 @@ private[columnar] case object RunLengthEncoding extends CompressionScheme { } } + override def gatherCompressibilityStats(in: ArrayBuffer): Unit = { + val actualSize = columnType.actualSize(null, 0) + _uncompressedSize = actualSize * in.array.length + + if (in.hasRemaining) { + var lastValue = columnType.get(in) + + while (in.hasRemaining) { + val value = columnType.get(in) + if (lastValue != value) { + _compressedSize += actualSize + 4 + lastValue = value + } + } + _compressedSize += actualSize + 4 + } + } + override def compress(from: ByteBuffer, to: ByteBuffer): ByteBuffer = { to.putInt(RunLengthEncoding.typeId) @@ -147,6 +173,34 @@ private[columnar] case object RunLengthEncoding extends CompressionScheme { to.rewind() to } + + override def compress(from: ArrayBuffer, to: ArrayBuffer): Unit = { + to.putInt(RunLengthEncoding.typeId) + + if (from.hasRemaining) { + var currentValue = columnType.get(from) + var currentRun = 1 + + while (from.hasRemaining) { + val value = columnType.get(from) + if (value == currentValue) { + currentRun += 1 + } else { + // Writes current run + columnType.put(to, currentValue) + to.putInt(currentRun) + + // Resets current run + currentValue = value + currentRun = 1 + } + } + + // Writes the last run + columnType.put(to, currentValue) + to.putInt(currentRun) + } + } } class Decoder[T <: AtomicType](buffer: ByteBuffer, columnType: NativeColumnType[T]) @@ -169,6 +223,26 @@ private[columnar] case object RunLengthEncoding extends CompressionScheme { } override def hasNext: Boolean = valueCount < run || buffer.hasRemaining + + override def decompress(values: ArrayBuffer): Unit = { + val buf = ArrayBuffer(buffer.array) + + // read typeID + buf.getInt() + + var run = 0 + var valueCount = 0 + while (valueCount < run || buf.hasRemaining) { + if (valueCount == run) { + currentValue = columnType.get(buf) + run = buf.getInt() + valueCount = 1 + } else { + valueCount += 1 + } + columnType.put(values, currentValue) + } + } } } @@ -263,6 +337,10 @@ private[columnar] case object DictionaryEncoding extends CompressionScheme { override def uncompressedSize: Int = _uncompressedSize override def compressedSize: Int = if (overflow) Int.MaxValue else dictionarySize + count * 2 + + override def compress(from: ArrayBuffer, to: ArrayBuffer): Unit = { + throw new UnsupportedOperationException + } } class Decoder[T <: AtomicType](buffer: ByteBuffer, columnType: NativeColumnType[T]) @@ -278,6 +356,10 @@ private[columnar] case object DictionaryEncoding extends CompressionScheme { } override def hasNext: Boolean = buffer.hasRemaining + + override def decompress(values: ArrayBuffer): Unit = { + throw new IllegalStateException("Not support in Dict") + } } } @@ -304,6 +386,10 @@ private[columnar] case object BooleanBitSet extends CompressionScheme { _uncompressedSize += BOOLEAN.defaultSize } + override def gatherCompressibilityStats(in: ArrayBuffer): Unit = { + _uncompressedSize += in.array.length * BOOLEAN.defaultSize; + } + override def compress(from: ByteBuffer, to: ByteBuffer): ByteBuffer = { to.putInt(BooleanBitSet.typeId) // Total element count (1 byte per Boolean value) @@ -341,6 +427,35 @@ private[columnar] case object BooleanBitSet extends CompressionScheme { to } + override def compress(from: ArrayBuffer, to: ArrayBuffer): Unit = { + to.putInt(BooleanBitSet.typeId) + to.putInt(from.array.length) + + while (from.remaining >= BITS_PER_LONG) { + var word = 0: Long + var i = 0 + while (i < BITS_PER_LONG) { + if (BOOLEAN.get(from)) { + word |= (1: Long) << i + } + i += 1 + } + to.putLong(word) + } + + if (from.hasRemaining) { + var word = 0: Long + var i = 0 + while (from.hasRemaining) { + if (BOOLEAN.get(from)) { + word |= (1: Long) << i + } + i += 1 + } + to.putLong(word) + } + } + override def uncompressedSize: Int = _uncompressedSize override def compressedSize: Int = { @@ -368,6 +483,26 @@ private[columnar] case object BooleanBitSet extends CompressionScheme { } override def hasNext: Boolean = visited < count + + override def decompress(values: ArrayBuffer): Unit = { + val buf = ArrayBuffer(buffer.array) + + // read typeID + buf.getInt() + val count = buf.getInt() + + var currentWord = 0L + var visited = 0 + while (visited < count) { + val bit = visited % BITS_PER_LONG + if (bit == 0) { + currentWord = buf.getLong() + } + val value = ((currentWord >> bit) & 1) != 0 + BOOLEAN.put(values, value) + visited += 1 + } + } } } @@ -434,6 +569,10 @@ private[columnar] case object IntDelta extends CompressionScheme { to.rewind().asInstanceOf[ByteBuffer] } + + override def compress(from: ArrayBuffer, to: ArrayBuffer): Unit = { + throw new UnsupportedOperationException + } } class Decoder(buffer: ByteBuffer, columnType: NativeColumnType[IntegerType.type]) @@ -448,6 +587,10 @@ private[columnar] case object IntDelta extends CompressionScheme { prev = if (delta > Byte.MinValue) prev + delta else ByteBufferHelper.getInt(buffer) row.setInt(ordinal, prev) } + + override def decompress(values: ArrayBuffer): Unit = { + throw new IllegalStateException("Not support in IntDelta") + } } } @@ -514,6 +657,10 @@ private[columnar] case object LongDelta extends CompressionScheme { to.rewind().asInstanceOf[ByteBuffer] } + + override def compress(from: ArrayBuffer, to: ArrayBuffer): Unit = { + throw new IllegalStateException("Not support in LongDelta") + } } class Decoder(buffer: ByteBuffer, columnType: NativeColumnType[LongType.type]) @@ -528,5 +675,9 @@ private[columnar] case object LongDelta extends CompressionScheme { prev = if (delta > Byte.MinValue) prev + delta else ByteBufferHelper.getLong(buffer) row.setLong(ordinal, prev) } + + override def decompress(values: ArrayBuffer): Unit = { + throw new IllegalStateException("Not support in LongDelta") + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala index d01bf911e3a7..3c628cf97d74 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala @@ -17,10 +17,12 @@ package org.apache.spark.sql.execution.columnar.compression +import java.nio.ByteBuffer + import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.GenericInternalRow -import org.apache.spark.sql.execution.columnar.{BOOLEAN, NoopColumnStats} +import org.apache.spark.sql.execution.columnar.{ArrayBuffer, BOOLEAN, NoopColumnStats} import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._ class BooleanBitSetSuite extends SparkFunSuite { @@ -104,4 +106,76 @@ class BooleanBitSetSuite extends SparkFunSuite { test(s"$BooleanBitSet: multiple words and 1 more bit") { skeleton(BITS_PER_LONG * 2 + 1) } + + def skeletonForBatch(count: Int) { + // ------------- + // Tests encoder + // ------------- + val inputArray = new Array[Boolean](count) + (0 until inputArray.length).foreach { i => + inputArray(i) = makeRandomValues(BOOLEAN)(0).asInstanceOf[Boolean]} + + val encoder = BooleanBitSet.encoder(BOOLEAN) + encoder.gatherCompressibilityStats(ArrayBuffer(inputArray)) + + val size = encoder.compressedSize + val compressedArray = new Array[Byte](4 + size) + encoder.compress(ArrayBuffer(inputArray), ArrayBuffer(compressedArray)) + + // Compression scheme ID + element count + bitset words + val compressedSize = 4 + 4 + { + val extra = if (count % BITS_PER_LONG == 0) 0 else 1 + (count / BITS_PER_LONG + extra) * 8 + } + + // 4 extra bytes for compression scheme type ID + assertResult(compressedSize, "Wrong buffer capacity")(compressedArray.size) + + // Skips column header + val buffer = ArrayBuffer(compressedArray) + assertResult(BooleanBitSet.typeId, "Wrong compression scheme ID")(buffer.getInt()) + assertResult(count, "Wrong element count")(buffer.getInt()) + + var word = 0: Long + for (i <- 0 until count) { + val bit = i % BITS_PER_LONG + word = if (bit == 0) buffer.getLong() else word + assertResult(inputArray(i), s"Wrong value in compressed buffer, index=$i") { + (word & ((1: Long) << bit)) != 0 + } + } + + // ------------- + // Tests decoder + // ------------- + val outputArray = new Array[Boolean](inputArray.length) + val decoder = BooleanBitSet.decoder(ByteBuffer.wrap(compressedArray), BOOLEAN) + decoder.decompress(ArrayBuffer(outputArray)) + + if (inputArray.nonEmpty) { + inputArray.zipWithIndex.foreach { case (value, i) => + assertResult(value, "Wrong decoded value")(outputArray(i)) + } + } + } + + test(s"$BooleanBitSet Batch: empty") { + skeletonForBatch(0) + } + + test(s"$BooleanBitSet Batch: less than 1 word") { + skeletonForBatch(BITS_PER_LONG - 1) + } + + test(s"$BooleanBitSet Batch: exactly 1 word") { + skeletonForBatch(BITS_PER_LONG) + } + + test(s"$BooleanBitSet Batch: multiple whole words") { + skeletonForBatch(BITS_PER_LONG * 2) + } + + test(s"$BooleanBitSet Batch: multiple words and 1 more bit") { + skeletonForBatch(BITS_PER_LONG * 2 + 1) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/RunLengthEncodingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/RunLengthEncodingSuite.scala index dffa9b364ebf..f73f08f87b54 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/RunLengthEncodingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/RunLengthEncodingSuite.scala @@ -17,11 +17,13 @@ package org.apache.spark.sql.execution.columnar.compression +import java.nio.ByteBuffer + import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.expressions.GenericInternalRow import org.apache.spark.sql.execution.columnar._ import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._ -import org.apache.spark.sql.types.AtomicType +import org.apache.spark.sql.types._ class RunLengthEncodingSuite extends SparkFunSuite { testRunLengthEncoding(new NoopColumnStats, BOOLEAN) @@ -111,4 +113,95 @@ class RunLengthEncodingSuite extends SparkFunSuite { skeleton(1, Seq(0 -> 1000)) } } + + testRunLengthEncoding(BooleanType, BOOLEAN) + testRunLengthEncoding(ByteType, BYTE) + testRunLengthEncoding(ShortType, SHORT) + testRunLengthEncoding(IntegerType, INT) + testRunLengthEncoding(LongType, LONG) + + def testRunLengthEncoding[T <: AtomicType]( + dataType: DataType, + columnType: NativeColumnType[T]) { + + def generateArray(dataType: DataType, length: Int): Array[T#InternalType] = { + (dataType match { + case BooleanType => new Array[Boolean](length) + case ByteType => new Array[Byte](length) + case ShortType => new Array[Short](length) + case IntegerType => new Array[Int](length) + case LongType => new Array[Long](length) + case other => throw new UnsupportedOperationException(s"Unsupported type: $other") + }).asInstanceOf[Array[T#InternalType]] + } + + val typeName = columnType.getClass.getSimpleName.stripSuffix("$") + + def skeleton(uniqueValueCount: Int, inputRuns: Seq[(Int, Int)]) { + // ------------- + // Tests encoder + // ------------- + + val (values, rows) = makeUniqueValuesAndSingleValueRows(columnType, uniqueValueCount) + val inputSeq = inputRuns.flatMap { case (index, run) => + Seq.fill(run)(index) + } + val inputArray = generateArray(dataType, inputSeq.length) + inputSeq.zipWithIndex.foreach { case (i, index) => inputArray(index) = values(i) } + + val encoder = RunLengthEncoding.encoder(columnType) + encoder.gatherCompressibilityStats(ArrayBuffer(inputArray)) + + val size = encoder.compressedSize + val compressedArray = new Array[Byte](4 + size) + encoder.compress(ArrayBuffer(inputArray), ArrayBuffer(compressedArray)) + val buffer = ArrayBuffer(compressedArray) + + // Compression scheme ID + compressed contents + val compressedSize = 4 + inputRuns.map { _ => + // 4 extra bytes each run for run length + columnType.actualSize(null, 0) + 4 + }.sum + + // 4 extra bytes for compression scheme type ID + assertResult(compressedSize, "Wrong buffer capacity")(compressedArray.length) + + // Skips column header + assertResult(RunLengthEncoding.typeId, "Wrong compression scheme ID")(buffer.getInt()) + + inputRuns.foreach { case (index, run) => + assertResult(values(index), "Wrong column element value")(columnType.get(buffer)) + assertResult(run, "Wrong run length")(buffer.getInt()) + } + + // ------------- + // Tests decoder + // ------------- + val outputArray = generateArray(dataType, inputArray.length) + val decoder = RunLengthEncoding.decoder(ByteBuffer.wrap(compressedArray), columnType) + decoder.decompress(ArrayBuffer(outputArray)) + + if (inputArray.nonEmpty) { + inputArray.zipWithIndex.foreach { case (value, i) => + assertResult(value, "Wrong decoded value")(outputArray(i)) + } + } + } + + test(s"$RunLengthEncoding Batch with $typeName: empty column") { + skeleton(0, Seq.empty) + } + + test(s"$RunLengthEncoding Batch with $typeName: simple case") { + skeleton(2, Seq(0 -> 2, 1 -> 2)) + } + + test(s"$RunLengthEncoding Batch with $typeName: run length == 1") { + skeleton(2, Seq(0 -> 1, 1 -> 1)) + } + + test(s"$RunLengthEncoding Batch with $typeName: single long run") { + skeleton(1, Seq(0 -> 1000)) + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala index e48e3f640290..0c46989f4ff0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala @@ -1077,4 +1077,40 @@ class ColumnarBatchSuite extends SparkFunSuite { s"vectorized reader")) } } + + private def getPrivateValue(column : ColumnVector, fieldName : String) : Any = { + val cls = classOf[OnHeapColumnVector] + val field = cls.getDeclaredField(fieldName) + field.setAccessible(true) + field.get(column) + } + + def performCompressDecompress(column: ColumnVector, dataFieldName: String): Unit = { + column.compress() + assert(getPrivateValue(column, "compressed").asInstanceOf[Boolean]) + assert((getPrivateValue(column, dataFieldName) == null)) + assert((getPrivateValue(column, "compressedData") != null)) + assert((getPrivateValue(column, "nulls") == null)) + assert((getPrivateValue(column, "compressedNulls") != null)) + column.decompress() + assert(getPrivateValue(column, "compressed").asInstanceOf[Boolean] == false) + assert((getPrivateValue(column, dataFieldName) != null)) + assert((getPrivateValue(column, "compressedData") == null)) + assert((getPrivateValue(column, "nulls") != null)) + assert((getPrivateValue(column, "compressedNulls") == null)) + + } + + test("compress and decompress") { + val len = 1024 + // data for RLE + val intArray = Array.tabulate(len)(i => (i / (len / 4))) + val column = ColumnVector.allocate(1024, IntegerType, MemoryMode.ON_HEAP) + (0 until len).foreach(i => column.putInt(i, intArray(i))) + performCompressDecompress(column, "intData") + (0 until len).foreach(i => assert(column.getInt(i) == intArray(i))) + performCompressDecompress(column, "intData") + (0 until len).foreach(i => assert(column.getInt(i) == intArray(i))) + column.close + } }