Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;

import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.bytes.BytesUtils;
Expand All @@ -38,31 +39,37 @@ public class ByteBitPackingValuesReader extends ValuesReader {
private final int[] decoded = new int[VALUES_AT_A_TIME];
private int decodedPosition = VALUES_AT_A_TIME - 1;
private ByteBufferInputStream in;
byte[] tempEncode;

public ByteBitPackingValuesReader(int bound, Packer packer) {
this.bitWidth = BytesUtils.getWidthFromMaxInt(bound);
this.packer = packer.newBytePacker(bitWidth);
// Keep reusing this buffer to eliminate object creation in the critical path
tempEncode = new byte[bitWidth];
}

private void readMore() {
try {
// This eliminates the use of slice(), which is slow because of object creation in the critical path.
int avail = in.available();
if (avail < bitWidth) {
in.read(tempEncode, 0, avail);
Arrays.fill(tempEncode, avail, bitWidth, (byte)0);
} else {
in.read(tempEncode, 0, bitWidth);
}
packer.unpack8Values(tempEncode, 0, decoded, 0);
} catch (IOException e) {
throw new ParquetDecodingException("Failed to read packed values", e);
}
decodedPosition = 0;
}

@Override
public int readInteger() {
++ decodedPosition;
if (decodedPosition == decoded.length) {
try {
if (in.available() < bitWidth) {
// unpack8Values needs at least bitWidth bytes to read from,
// We have to fill in 0 byte at the end of encoded bytes.
byte[] tempEncode = new byte[bitWidth];
in.read(tempEncode, 0, in.available());
packer.unpack8Values(tempEncode, 0, decoded, 0);
} else {
ByteBuffer encoded = in.slice(bitWidth);
packer.unpack8Values(encoded, encoded.position(), decoded, 0);
}
} catch (IOException e) {
throw new ParquetDecodingException("Failed to read packed values", e);
}
decodedPosition = 0;
readMore();
}
return decoded[decodedPosition];
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@
abstract public class PlainValuesReader extends ValuesReader {
private static final Logger LOG = LoggerFactory.getLogger(PlainValuesReader.class);

protected LittleEndianDataInputStream in;
protected ByteBufferInputStream in;

@Override
public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IOException {
LOG.debug("init from page at offset {} for length {}", stream.position(), stream.available());
this.in = new LittleEndianDataInputStream(stream.remainingStream());
// No need for "remainingStream()" since the caller is done with the stream
this.in = stream;
}

@Override
Expand All @@ -47,10 +48,7 @@ public void skip() {
}

void skipBytesFully(int n) throws IOException {
int skipped = 0;
while (skipped < n) {
skipped += in.skipBytes(n - skipped);
}
in.skipFully(n);
}

public static class DoublePlainValuesReader extends PlainValuesReader {
Expand Down Expand Up @@ -100,7 +98,7 @@ public static class IntegerPlainValuesReader extends PlainValuesReader {
@Override
public void skip(int n) {
try {
in.skipBytes(n * 4);
skipBytesFully(n * 4);
} catch (IOException e) {
throw new ParquetDecodingException("could not skip " + n + " ints", e);
}
Expand All @@ -121,7 +119,7 @@ public static class LongPlainValuesReader extends PlainValuesReader {
@Override
public void skip(int n) {
try {
in.skipBytes(n * 8);
skipBytesFully(n * 8);
} catch (IOException e) {
throw new ParquetDecodingException("could not skip " + n + " longs", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.io.InputStream;

import org.apache.parquet.Preconditions;
import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.values.bitpacking.BytePacker;
import org.apache.parquet.column.values.bitpacking.Packer;
Expand All @@ -37,13 +38,16 @@
public class RunLengthBitPackingHybridDecoder {
private static final Logger LOG = LoggerFactory.getLogger(RunLengthBitPackingHybridDecoder.class);

private static enum MODE { RLE, PACKED }

private final int bitWidth;
private final BytePacker packer;
private final InputStream in;
private final ByteBufferInputStream in;

/*
Note: In an older version, this class used to use an enum to keep track of the mode. Switching to a boolean
resulted in a measurable performance improvement.
*/
boolean packed_mode;

private MODE mode;
private int currentCount;
private int currentValue;
private int[] currentBuffer;
Expand All @@ -54,39 +58,41 @@ public RunLengthBitPackingHybridDecoder(int bitWidth, InputStream in) {
Preconditions.checkArgument(bitWidth >= 0 && bitWidth <= 32, "bitWidth must be >= 0 and <= 32");
this.bitWidth = bitWidth;
this.packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidth);
this.in = in;
// Every place in ParquetMR that calls this constructor does so with a ByteBufferInputStream. If some other
// user calls this with some other type, we can rely on the ClassCastException to be thrown.
this.in = (ByteBufferInputStream)in;
}

public int readInt() throws IOException {
if (currentCount == 0) {
readNext();
}
-- currentCount;
--currentCount;
int result;
switch (mode) {
case RLE:
result = currentValue;
break;
case PACKED:
result = currentBuffer[currentBuffer.length - 1 - currentCount];
break;
default:
throw new ParquetDecodingException("not a valid mode " + mode);
if (packed_mode) {
return currentBuffer[currentBuffer.length - 1 - currentCount];
} else {
return currentValue;
}
return result;
}

/*
Note: An older version used to create a DataInputStream just to be able to call readFully. This object creation
in the critical path was bad for performance. Since we're using the new ByteBufferInputStream, we can call
its built-in readFully, along with other optimized methods like readUnsignedVarInt and
readIntLittleEndianPaddedOnBitWidth, which replace the use of BytesUtils, which we can't count on being
inlined.
*/

private void readNext() throws IOException {
Preconditions.checkArgument(in.available() > 0, "Reading past RLE/BitPacking stream.");
final int header = BytesUtils.readUnsignedVarInt(in);
mode = (header & 1) == 0 ? MODE.RLE : MODE.PACKED;
switch (mode) {
case RLE:
final int header = in.readUnsignedVarInt();
packed_mode = (header & 1) != 0;
if (!packed_mode) {
currentCount = header >>> 1;
LOG.debug("reading {} values RLE", currentCount);
currentValue = BytesUtils.readIntLittleEndianPaddedOnBitWidth(in, bitWidth);
break;
case PACKED:
currentValue = in.readIntLittleEndianPaddedOnBitWidth(bitWidth);
} else {
int numGroups = header >>> 1;
currentCount = numGroups * 8;
LOG.debug("reading {} values BIT PACKED", currentCount);
Expand All @@ -95,13 +101,11 @@ private void readNext() throws IOException {
// At the end of the file RLE data though, there might not be that many bytes left.
int bytesToRead = (int)Math.ceil(currentCount * bitWidth / 8.0);
bytesToRead = Math.min(bytesToRead, in.available());
new DataInputStream(in).readFully(bytes, 0, bytesToRead);
in.readFully(bytes, 0, bytesToRead);
for (int valueIndex = 0, byteIndex = 0; valueIndex < currentCount; valueIndex += 8, byteIndex += bitWidth) {
// It's faster to use an array with unpack8Values than to use a ByteBuffer.
packer.unpack8Values(bytes, byteIndex, currentBuffer, valueIndex);
}
break;
default:
throw new ParquetDecodingException("not a valid mode " + mode);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Arrays;
import java.util.List;

import org.apache.parquet.bytes.ByteBufferInputStream;
import org.junit.Test;

import org.apache.parquet.bytes.DirectByteBufferAllocator;
Expand Down Expand Up @@ -294,7 +295,7 @@ public void testGroupBoundary() throws Exception {
// bit width 2.
bytes[0] = (1 << 1 )| 1;
bytes[1] = (1 << 0) | (2 << 2) | (3 << 4);
ByteArrayInputStream stream = new ByteArrayInputStream(bytes);
ByteBufferInputStream stream = new ByteBufferInputStream(bytes);
RunLengthBitPackingHybridDecoder decoder = new RunLengthBitPackingHybridDecoder(2, stream);
assertEquals(decoder.readInt(), 1);
assertEquals(decoder.readInt(), 2);
Expand Down
Loading