Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,14 @@ public void reset() {
*/
public abstract void close();

/**
* Compress or decompress data for this column if possible.
* Note: After calling compress(), gettter/setter (e.g. getInt) do not work
* until decompress() will be called
*/
public abstract void compress();
public abstract void decompress();

public void reserve(int requiredCapacity) {
if (requiredCapacity > capacity) {
int newCapacity = (int) Math.min(MAX_CAPACITY, requiredCapacity * 2L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,14 @@ public void close() {
offsetData = 0;
}

@Override
public void compress() {
}

@Override
public void decompress() {
}

//
// APIs dealing with nulls
//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Arrays;

import org.apache.spark.memory.MemoryMode;
import org.apache.spark.sql.execution.columnar.compression.ColumnVectorCompressionBuilder;
import org.apache.spark.sql.types.*;
import org.apache.spark.unsafe.Platform;

Expand All @@ -30,6 +31,10 @@
*/
public final class OnHeapColumnVector extends ColumnVector {

private boolean compressed;
private ColumnVectorCompressionBuilder compressionBuilder;
private ColumnVectorCompressionBuilder nullCompressionBuilder;

private static final boolean bigEndianPlatform =
ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN);

Expand All @@ -38,6 +43,7 @@ public final class OnHeapColumnVector extends ColumnVector {

// This is faster than a boolean array and we optimize this over memory footprint.
private byte[] nulls;
private byte[] compressedNulls;

// Array for each type. Only 1 is populated for any type.
private byte[] byteData;
Expand All @@ -46,6 +52,7 @@ public final class OnHeapColumnVector extends ColumnVector {
private long[] longData;
private float[] floatData;
private double[] doubleData;
private byte[] compressedData;

// Only set if type is Array.
private int[] arrayLengths;
Expand All @@ -70,6 +77,81 @@ public long nullsNativeAddress() {
public void close() {
}

@Override
public void compress() {
if (compressed) return;

Object inputData =
(type instanceof BooleanType || type instanceof ByteType) ? byteData :
(type instanceof ShortType) ? shortData :
(type instanceof IntegerType) ? intData :
(type instanceof LongType) ? longData :
(type instanceof FloatType) ? floatData :
(type instanceof DoubleType) ? doubleData : null;
if (inputData != null) {
if (compressionBuilder == null) {
compressionBuilder = new ColumnVectorCompressionBuilder((AtomicType) type);
}
byte[] out = compressionBuilder.compress(inputData);
if (out != null) {
compressedData = out;
byteData = null;
shortData = null;
intData = null;
longData = null;
floatData = null;
doubleData = null;
}
}

if (nulls != null) {
if (nullCompressionBuilder == null) {
nullCompressionBuilder = new ColumnVectorCompressionBuilder(BooleanType$.MODULE$);
}
byte[] out = nullCompressionBuilder.compress(nulls);
if (out != null) {
compressedNulls = out;
nulls = null;
}
}

compressed = (compressedData != null) || (compressedNulls != null);
}

@Override
public void decompress() {
if (!compressed) return;

if (compressedData != null) {
Object outputData = null;
if (type instanceof BooleanType || type instanceof ByteType) {
outputData = byteData = new byte[capacity];
} else if (type instanceof ShortType) {
outputData = shortData = new short[capacity];
} else if (type instanceof IntegerType) {
outputData = intData = new int[capacity];
} else if (type instanceof LongType) {
outputData = longData = new long[capacity];
} else if (type instanceof FloatType) {
outputData = floatData = new float[capacity];
} else if (type instanceof DoubleType) {
outputData = doubleData = new double[capacity];
}
if (outputData != null) {
compressionBuilder.decompress(compressedData, outputData);
compressedData = null;
}
}

if (compressedNulls != null) {
nulls = new byte[capacity];
nullCompressionBuilder.decompress(compressedNulls, nulls);
compressedNulls = null;
}

compressed = false;
}

//
// APIs dealing with nulls
//
Expand Down
Loading