diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/ByteBitPackingValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/ByteBitPackingValuesReader.java index 1fa62d4b5a..52424dc5e5 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/ByteBitPackingValuesReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/ByteBitPackingValuesReader.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Arrays; import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.bytes.BytesUtils; @@ -38,31 +39,37 @@ public class ByteBitPackingValuesReader extends ValuesReader { private final int[] decoded = new int[VALUES_AT_A_TIME]; private int decodedPosition = VALUES_AT_A_TIME - 1; private ByteBufferInputStream in; + byte[] tempEncode; public ByteBitPackingValuesReader(int bound, Packer packer) { this.bitWidth = BytesUtils.getWidthFromMaxInt(bound); this.packer = packer.newBytePacker(bitWidth); + // Keep reusing this buffer to eliminate object creation in the critical path + tempEncode = new byte[bitWidth]; + } + + private void readMore() { + try { + // This eliminates the use of slice(), which is slow because of object creation in the critical path. + int avail = in.available(); + if (avail < bitWidth) { + in.read(tempEncode, 0, avail); + Arrays.fill(tempEncode, avail, bitWidth, (byte)0); + } else { + in.read(tempEncode, 0, bitWidth); + } + packer.unpack8Values(tempEncode, 0, decoded, 0); + } catch (IOException e) { + throw new ParquetDecodingException("Failed to read packed values", e); + } + decodedPosition = 0; } @Override public int readInteger() { ++ decodedPosition; if (decodedPosition == decoded.length) { - try { - if (in.available() < bitWidth) { - // unpack8Values needs at least bitWidth bytes to read from, - // We have to fill in 0 byte at the end of encoded bytes. - byte[] tempEncode = new byte[bitWidth]; - in.read(tempEncode, 0, in.available()); - packer.unpack8Values(tempEncode, 0, decoded, 0); - } else { - ByteBuffer encoded = in.slice(bitWidth); - packer.unpack8Values(encoded, encoded.position(), decoded, 0); - } - } catch (IOException e) { - throw new ParquetDecodingException("Failed to read packed values", e); - } - decodedPosition = 0; + readMore(); } return decoded[decodedPosition]; } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java index 127817eb0c..c14e8236a9 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java @@ -33,12 +33,13 @@ abstract public class PlainValuesReader extends ValuesReader { private static final Logger LOG = LoggerFactory.getLogger(PlainValuesReader.class); - protected LittleEndianDataInputStream in; + protected ByteBufferInputStream in; @Override public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IOException { LOG.debug("init from page at offset {} for length {}", stream.position(), stream.available()); - this.in = new LittleEndianDataInputStream(stream.remainingStream()); + // No need for "remainingStream()" since the caller is done with the stream + this.in = stream; } @Override @@ -47,10 +48,7 @@ public void skip() { } void skipBytesFully(int n) throws IOException { - int skipped = 0; - while (skipped < n) { - skipped += in.skipBytes(n - skipped); - } + in.skipFully(n); } public static class DoublePlainValuesReader extends PlainValuesReader { @@ -100,7 +98,7 @@ public static class IntegerPlainValuesReader extends PlainValuesReader { @Override public void skip(int n) { try { - in.skipBytes(n * 4); + skipBytesFully(n * 4); } catch (IOException e) { throw new ParquetDecodingException("could not skip " + n + " ints", e); } @@ -121,7 +119,7 @@ public static class LongPlainValuesReader extends PlainValuesReader { @Override public void skip(int n) { try { - in.skipBytes(n * 8); + skipBytesFully(n * 8); } catch (IOException e) { throw new ParquetDecodingException("could not skip " + n + " longs", e); } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java index 0db266fe24..aa61c876c8 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java @@ -24,6 +24,7 @@ import java.io.InputStream; import org.apache.parquet.Preconditions; +import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.bytes.BytesUtils; import org.apache.parquet.column.values.bitpacking.BytePacker; import org.apache.parquet.column.values.bitpacking.Packer; @@ -37,13 +38,16 @@ public class RunLengthBitPackingHybridDecoder { private static final Logger LOG = LoggerFactory.getLogger(RunLengthBitPackingHybridDecoder.class); - private static enum MODE { RLE, PACKED } - private final int bitWidth; private final BytePacker packer; - private final InputStream in; + private final ByteBufferInputStream in; + + /* + Note: In an older version, this class used to use an enum to keep track of the mode. Switching to a boolean + resulted in a measurable performance improvement. + */ + boolean packed_mode; - private MODE mode; private int currentCount; private int currentValue; private int[] currentBuffer; @@ -54,39 +58,41 @@ public RunLengthBitPackingHybridDecoder(int bitWidth, InputStream in) { Preconditions.checkArgument(bitWidth >= 0 && bitWidth <= 32, "bitWidth must be >= 0 and <= 32"); this.bitWidth = bitWidth; this.packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidth); - this.in = in; + // Every place in ParquetMR that calls this constructor does so with a ByteBufferInputStream. If some other + // user calls this with some other type, we can rely on the ClassCastException to be thrown. + this.in = (ByteBufferInputStream)in; } public int readInt() throws IOException { if (currentCount == 0) { readNext(); } - -- currentCount; + --currentCount; int result; - switch (mode) { - case RLE: - result = currentValue; - break; - case PACKED: - result = currentBuffer[currentBuffer.length - 1 - currentCount]; - break; - default: - throw new ParquetDecodingException("not a valid mode " + mode); + if (packed_mode) { + return currentBuffer[currentBuffer.length - 1 - currentCount]; + } else { + return currentValue; } - return result; } + /* + Note: An older version used to create a DataInputStream just to be able to call readFully. This object creation + in the critical path was bad for performance. Since we're using the new ByteBufferInputStream, we can call + its built-in readFully, along with other optimized methods like readUnsignedVarInt and + readIntLittleEndianPaddedOnBitWidth, which replace the use of BytesUtils, which we can't count on being + inlined. + */ + private void readNext() throws IOException { Preconditions.checkArgument(in.available() > 0, "Reading past RLE/BitPacking stream."); - final int header = BytesUtils.readUnsignedVarInt(in); - mode = (header & 1) == 0 ? MODE.RLE : MODE.PACKED; - switch (mode) { - case RLE: + final int header = in.readUnsignedVarInt(); + packed_mode = (header & 1) != 0; + if (!packed_mode) { currentCount = header >>> 1; LOG.debug("reading {} values RLE", currentCount); - currentValue = BytesUtils.readIntLittleEndianPaddedOnBitWidth(in, bitWidth); - break; - case PACKED: + currentValue = in.readIntLittleEndianPaddedOnBitWidth(bitWidth); + } else { int numGroups = header >>> 1; currentCount = numGroups * 8; LOG.debug("reading {} values BIT PACKED", currentCount); @@ -95,13 +101,11 @@ private void readNext() throws IOException { // At the end of the file RLE data though, there might not be that many bytes left. int bytesToRead = (int)Math.ceil(currentCount * bitWidth / 8.0); bytesToRead = Math.min(bytesToRead, in.available()); - new DataInputStream(in).readFully(bytes, 0, bytesToRead); + in.readFully(bytes, 0, bytesToRead); for (int valueIndex = 0, byteIndex = 0; valueIndex < currentCount; valueIndex += 8, byteIndex += bitWidth) { + // It's faster to use an array with unpack8Values than to use a ByteBuffer. packer.unpack8Values(bytes, byteIndex, currentBuffer, valueIndex); } - break; - default: - throw new ParquetDecodingException("not a valid mode " + mode); } } } diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java b/parquet-column/src/test/java/org/apache/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java index bfa9482cb3..497fadf025 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java @@ -25,6 +25,7 @@ import java.util.Arrays; import java.util.List; +import org.apache.parquet.bytes.ByteBufferInputStream; import org.junit.Test; import org.apache.parquet.bytes.DirectByteBufferAllocator; @@ -294,7 +295,7 @@ public void testGroupBoundary() throws Exception { // bit width 2. bytes[0] = (1 << 1 )| 1; bytes[1] = (1 << 0) | (2 << 2) | (3 << 4); - ByteArrayInputStream stream = new ByteArrayInputStream(bytes); + ByteBufferInputStream stream = new ByteBufferInputStream(bytes); RunLengthBitPackingHybridDecoder decoder = new RunLengthBitPackingHybridDecoder(2, stream); assertEquals(decoder.readInt(), 1); assertEquals(decoder.readInt(), 2); diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java b/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java index 495cca2ec8..79ae378650 100644 --- a/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java @@ -28,6 +28,16 @@ import org.apache.parquet.ShouldNeverHappenException; +/* +Changes implemented: +All of the functionality of LittleEndianDataInputStream has been merged into ByteBufferInputStream and its child +classes. This has resulted in measurable performance improvements for the following reasons: +- Elimination of at least one layer of abstraction / method call overhead +- Enabling support for intrinsics for readInt, readLong, etc. +- Eliminate the need for the JIT to make inferences that may or may not inline methods from BytesUtils and + the InputStream.read() that is called by BytesUtils. + */ + public class ByteBufferInputStream extends InputStream { // Used to maintain the deprecated behavior of instantiating ByteBufferInputStream directly @@ -49,6 +59,19 @@ public static ByteBufferInputStream wrap(List buffers) { } } + public static ByteBufferInputStream wrap(ByteBuffer buffer, int offset, int count) { + return new SingleBufferInputStream(buffer, offset, count); + } + + public static ByteBufferInputStream wrap(byte[] buf) { + return new SingleBufferInputStream(buf); + } + + public static ByteBufferInputStream wrap(byte[] buf, int start, int length) { + return new SingleBufferInputStream(buf, start, length); + } + + ByteBufferInputStream() { delegate = null; } @@ -74,11 +97,26 @@ public ByteBufferInputStream(ByteBuffer buffer) { */ @Deprecated public ByteBufferInputStream(ByteBuffer buffer, int offset, int count) { + // This is necessary to pass "TestDeprecatedBufferInputStream"... ByteBuffer temp = buffer.duplicate(); temp.position(offset); ByteBuffer byteBuf = temp.slice(); byteBuf.limit(count); delegate = wrap(byteBuf); + // ... but it would probably be faster to do this: +// delegate = wrap(buffer, offset, count); + } + + public ByteBufferInputStream(byte[] inBuf) { + delegate = wrap(inBuf); + } + + public ByteBufferInputStream(byte[] inBuf, int start, int length) { + delegate = wrap(inBuf, start, length); + } + + public ByteBufferInputStream(List inBufs) { + delegate = wrap(inBufs); } /** @@ -98,12 +136,12 @@ public long position() { return delegate.position(); } + public void position(int pos) { + throw new UnsupportedOperationException(); + } + public void skipFully(long n) throws IOException { - long skipped = skip(n); - if (skipped < n) { - throw new EOFException( - "Not enough bytes to skip: " + skipped + " < " + n); - } + delegate.skipFully(n); } public int read(ByteBuffer out) { @@ -119,7 +157,8 @@ public List sliceBuffers(long length) throws EOFException { } public ByteBufferInputStream sliceStream(long length) throws EOFException { - return ByteBufferInputStream.wrap(sliceBuffers(length)); + return delegate.sliceStream(length); + //return ByteBufferInputStream.wrap(sliceBuffers(length)); } public List remainingBuffers() { @@ -127,7 +166,11 @@ public List remainingBuffers() { } public ByteBufferInputStream remainingStream() { - return ByteBufferInputStream.wrap(remainingBuffers()); + return delegate.remainingStream(); + } + + public ByteBufferInputStream duplicate() { + return delegate.duplicate(); } public int read() throws IOException { @@ -138,14 +181,34 @@ public int read(byte[] b, int off, int len) throws IOException { return delegate.read(b, off, len); } + public int read(byte[] b) throws IOException { + return read(b, 0, b.length); + } + + public void readFully(byte b[]) throws IOException { + delegate.readFully(b, 0, b.length); + } + + public void readFully(byte b[], int off, int len) throws IOException { + delegate.readFully(b, off, len); + } + public long skip(long n) { return delegate.skip(n); } + public int skipBytes(int n) { + return (int)skip(n); + } + public int available() { return delegate.available(); } + public int remaining() { + return available(); + } + public void mark(int readlimit) { delegate.mark(readlimit); } @@ -157,4 +220,83 @@ public void reset() throws IOException { public boolean markSupported() { return delegate.markSupported(); } + + public void close() throws IOException { + } + + public boolean readBoolean() throws IOException { + return readByte() != 0; + } + + public byte readByte() throws IOException { + return delegate.readByte(); + } + + public int readUnsignedByte() throws IOException { + return delegate.readUnsignedByte(); + } + + public short readShort() throws IOException { + return delegate.readShort(); + } + + public int readUnsignedShort() throws IOException { + return delegate.readUnsignedShort(); + } + + public int readInt() throws IOException { + return delegate.readInt(); + } + + public long readLong() throws IOException { + return delegate.readLong(); + } + + public float readFloat() throws IOException { + return Float.intBitsToFloat(readInt()); + } + + public double readDouble() throws IOException { + return Double.longBitsToDouble(readLong()); + } + + public int readIntLittleEndianOnThreeBytes() throws IOException { + int ch1 = readUnsignedByte(); + int ch2 = readUnsignedByte(); + int ch3 = readUnsignedByte(); + return ((ch3 << 16) + (ch2 << 8) + (ch1 << 0)); + } + + public int readIntLittleEndianPaddedOnBitWidth(int bitWidth) + throws IOException { + + int bytesWidth = BytesUtils.paddedByteCountFromBits(bitWidth); + switch (bytesWidth) { + case 0: + return 0; + case 1: + return readUnsignedByte(); + case 2: + return readUnsignedShort(); + case 3: + return readIntLittleEndianOnThreeBytes(); + case 4: + return readInt(); + default: + throw new IOException( + String.format("Encountered bitWidth (%d) that requires more than 4 bytes", bitWidth)); + } + } + + public int readUnsignedVarInt() throws IOException { + int value = 0; + int i = 0; + int b; + while (((b = readUnsignedByte()) & 0x80) != 0) { + value |= (b & 0x7F) << i; + i += 7; + } + return value | (b << i); + } + } diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/LittleEndianDataInputStream.java b/parquet-common/src/main/java/org/apache/parquet/bytes/LittleEndianDataInputStream.java index 4338c2458e..e7ad26c408 100644 --- a/parquet-common/src/main/java/org/apache/parquet/bytes/LittleEndianDataInputStream.java +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/LittleEndianDataInputStream.java @@ -25,6 +25,7 @@ /** * Based on DataInputStream but little endian and without the String/char methods */ +@Deprecated public final class LittleEndianDataInputStream extends InputStream { private final InputStream in; diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/MultiBufferInputStream.java b/parquet-common/src/main/java/org/apache/parquet/bytes/MultiBufferInputStream.java index 34fa2505af..810bbba620 100644 --- a/parquet-common/src/main/java/org/apache/parquet/bytes/MultiBufferInputStream.java +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/MultiBufferInputStream.java @@ -19,6 +19,8 @@ package org.apache.parquet.bytes; +import org.apache.parquet.ShouldNeverHappenException; + import java.io.EOFException; import java.io.IOException; import java.nio.ByteBuffer; @@ -89,6 +91,15 @@ public long skip(long n) { return bytesSkipped; } + @Override + public void skipFully(long n) throws IOException { + if (current == null || n > length) { + throw new EOFException(); + } + + skip(n); + } + @Override public int read(ByteBuffer out) { int len = out.remaining(); @@ -193,6 +204,10 @@ public List sliceBuffers(long len) throws EOFException { return buffers; } + public ByteBufferInputStream sliceStream(long length) throws EOFException { + return ByteBufferInputStream.wrap(sliceBuffers(length)); + } + @Override public List remainingBuffers() { if (position >= length) { @@ -208,12 +223,24 @@ public List remainingBuffers() { } } + public ByteBufferInputStream remainingStream() { + return ByteBufferInputStream.wrap(remainingBuffers()); + } + + @Override + public ByteBufferInputStream duplicate() { + // Nothing ever uses this, and it's complicated to make a duplicate that doesn't cause side effects + // for the original. If this is ever necessary, we can go through the effort of adding this at that time. + throw new UnsupportedOperationException(); + } + @Override public int read(byte[] bytes, int off, int len) { if (len <= 0) { if (len < 0) { throw new IndexOutOfBoundsException("Read length must be greater than 0: " + len); } + return 0; } @@ -238,8 +265,31 @@ public int read(byte[] bytes, int off, int len) { } @Override - public int read(byte[] bytes) { - return read(bytes, 0, bytes.length); + public void readFully(byte[] bytes, int off, int len) throws IOException { + if (len <= 0) { + if (len < 0) { + throw new IndexOutOfBoundsException("Read length must be greater than 0: " + len); + } + + return; + } + + if (current == null || len > length) { + throw new EOFException(); + } + + int bytesRead = 0; + while (bytesRead < len) { + if (current.remaining() > 0) { + int bytesToRead = Math.min(len - bytesRead, current.remaining()); + current.get(bytes, off + bytesRead, bytesToRead); + bytesRead += bytesToRead; + this.position += bytesToRead; + } else if (!nextBuffer()) { + // there are no more buffers + throw new ShouldNeverHappenException(); + } + } } @Override @@ -248,13 +298,17 @@ public int read() throws IOException { throw new EOFException(); } + this.position += 1; while (true) { - if (current.remaining() > 0) { - this.position += 1; - return current.get() & 0xFF; // as unsigned - } else if (!nextBuffer()) { - // there are no more buffers - throw new EOFException(); + try { + return current.get() & 255; + } catch (Exception e) { + // It has been measured to be faster to rely on ByteBuffer throwing BufferUnderflowException to determine + // when we're run out of bytes in the current buffer. + if (!nextBuffer()) { + // there are no more buffers + throw new EOFException(); + } } } } @@ -274,6 +328,7 @@ public void mark(int readlimit) { if (mark >= 0) { discardMark(); } + this.mark = position; this.markLimit = mark + readlimit + 1; if (current != null) { @@ -313,6 +368,8 @@ private boolean nextBuffer() { } this.current = iterator.next().duplicate(); + // Have to put the buffer in little endian mode, because it defaults to big endian + this.current.order(java.nio.ByteOrder.LITTLE_ENDIAN); if (mark >= 0) { if (position < markLimit) { @@ -379,4 +436,120 @@ public void remove() { second.remove(); } } + + @Override + public byte readByte() throws IOException { + return (byte)readUnsignedByte(); + } + + @Override + public int readUnsignedByte() throws IOException { + if (current == null) { + throw new EOFException(); + } + + this.position += 1; + while (true) { + try { + return current.get() & 255; + } catch (Exception e) { + if (!nextBuffer()) { + // there are no more buffers + throw new EOFException(); + } + } + } + } + + /** + * When reading a short will cross a buffer boundary, read one byte at a time. + * @return a short value + * @throws IOException + */ + private int getShortSlow() throws IOException { + int c0 = readUnsignedByte(); + int c1 = readUnsignedByte(); + return ((c0 << 0) + (c1 << 8)); + } + + public short readShort() throws IOException { + if (current == null) { + throw new EOFException(); + } + + if (current.remaining() >= 2) { + // If the whole short can be read from the current buffer, use intrinsics + this.position += 2; + return current.getShort(); + } else { + // Otherwise get the short one byte at a time + return (short)getShortSlow(); + } + } + + public int readUnsignedShort() throws IOException { + return readShort() & 0xffff; + } + + /** + * When reading an int will cross a buffer boundary, read one byte at a time. + * @return an int value + * @throws IOException + */ + private int getIntSlow() throws IOException { + int c0 = readUnsignedByte(); + int c1 = readUnsignedByte(); + int c2 = readUnsignedByte(); + int c3 = readUnsignedByte(); + return ((c0 << 0) + (c1 << 8)) + ((c2 << 16) + (c3 << 24)); + } + + @Override + public int readInt() throws IOException { + if (current == null) { + throw new EOFException(); + } + + if (current.remaining() >= 4) { + // If the whole int can be read from the current buffer, use intrinsics + this.position += 4; + return current.getInt(); + } else { + // Otherwise get the int one byte at a time + return getIntSlow(); + } + } + + /** + * When reading a long will cross a buffer boundary, read one byte at a time. + * @return a long value + * @throws IOException + */ + private long getLongSlow() throws IOException { + long ch0 = (long)readUnsignedByte() << 0; + long ch1 = (long)readUnsignedByte() << 8; + long ch2 = (long)readUnsignedByte() << 16; + long ch3 = (long)readUnsignedByte() << 24; + long ch4 = (long)readUnsignedByte() << 32; + long ch5 = (long)readUnsignedByte() << 40; + long ch6 = (long)readUnsignedByte() << 48; + long ch7 = (long)readUnsignedByte() << 56; + return ((ch0 + ch1) + (ch2 + ch3)) + ((ch4 + ch5) + (ch6 + ch7)); + } + + @Override + public long readLong() throws IOException { + if (current == null) { + throw new EOFException(); + } + + if (current.remaining() >= 8) { + // If the whole short can be read from the current buffer, use intrinsics + this.position += 8; + return current.getLong(); + } else { + // Otherwise get the long one byte at a time + return getLongSlow(); + } + } } diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/SingleBufferInputStream.java b/parquet-common/src/main/java/org/apache/parquet/bytes/SingleBufferInputStream.java index 999d1bb4f6..a85828f1fc 100644 --- a/parquet-common/src/main/java/org/apache/parquet/bytes/SingleBufferInputStream.java +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/SingleBufferInputStream.java @@ -38,6 +38,34 @@ class SingleBufferInputStream extends ByteBufferInputStream { // duplicate the buffer because its state will be modified this.buffer = buffer.duplicate(); this.startPosition = buffer.position(); + this.buffer.order(java.nio.ByteOrder.LITTLE_ENDIAN); + } + + SingleBufferInputStream(ByteBuffer buffer, int start, int length) { + // duplicate the buffer because its state will be modified + this.buffer = buffer.duplicate(); + this.startPosition = start; + this.buffer.position(start); + this.buffer.limit(start + length); + this.buffer.order(java.nio.ByteOrder.LITTLE_ENDIAN); + } + + SingleBufferInputStream(byte[] inBuf) { + this.buffer = ByteBuffer.wrap(inBuf); + this.buffer.order(java.nio.ByteOrder.LITTLE_ENDIAN); + this.startPosition = 0; + } + + SingleBufferInputStream(byte[] inBuf, int start, int length) { + this.buffer = ByteBuffer.wrap(inBuf, start, length); + this.buffer.order(java.nio.ByteOrder.LITTLE_ENDIAN); + // This seems to be consistent with HeapByteBuffer.wrap(), which leaves + // the internal "offset" at zero and sets the starting position at start. + this.startPosition = 0; + } + + SingleBufferInputStream(List inBufs) { + throw new UnsupportedOperationException(); } @Override @@ -46,12 +74,19 @@ public long position() { return buffer.position() - startPosition; } + /* + Note: For all read methods, if we read off the end of the ByteBuffer, BufferUnderflowException is thrown, which + we catch and turn into an EOFException. This is measured to be faster than explicitly checking if the ByteBuffer + has any remaining bytes. + */ + @Override public int read() throws IOException { - if (!buffer.hasRemaining()) { - throw new EOFException(); + try { + return buffer.get() & 0xFF; + } catch (BufferUnderflowException e) { + throw new EOFException(e); } - return buffer.get() & 0xFF; // as unsigned } @Override @@ -70,9 +105,22 @@ public int read(byte[] bytes, int offset, int length) throws IOException { return bytesToRead; } - + + @Override + public void readFully(byte[] bytes, int offset, int length) throws IOException { + try { + buffer.get(bytes, offset, length); + } catch (BufferUnderflowException|IndexOutOfBoundsException e) { + throw new EOFException(e); + } + } + @Override public long skip(long n) { + if (n < 0) { + throw new IllegalArgumentException(); + } + if (n == 0) { return 0; } @@ -88,6 +136,19 @@ public long skip(long n) { return bytesToSkip; } + @Override + public void skipFully(long n) throws IOException { + if (n < 0 || n > Integer.MAX_VALUE) { + throw new IllegalArgumentException(); + } + + try { + buffer.position(buffer.position() + (int)n); + } catch (IllegalArgumentException e) { + throw new EOFException(e); + } + } + @Override public int read(ByteBuffer out) { int bytesToCopy; @@ -150,6 +211,27 @@ public List remainingBuffers() { return Collections.singletonList(remaining); } + @Override + public ByteBufferInputStream remainingStream() { + // Constructor makes duplicate, so we don't have to explicitly make a duplicate here + ByteBufferInputStream remaining = new SingleBufferInputStream(buffer); + buffer.position(buffer.limit()); + return remaining; + } + + @Override + public ByteBufferInputStream sliceStream(long length) throws EOFException { + if (length > buffer.remaining()) throw new EOFException(); + ByteBufferInputStream remaining = new SingleBufferInputStream(buffer, buffer.position(), (int)length); + buffer.position(buffer.position() + (int)length); + return remaining; + } + + @Override + public ByteBufferInputStream duplicate() { + return new SingleBufferInputStream(buffer); + } + @Override public void mark(int readlimit) { this.mark = buffer.position(); @@ -174,4 +256,63 @@ public boolean markSupported() { public int available() { return buffer.remaining(); } + + @Override + public byte readByte() throws IOException { + try { + return buffer.get(); + } catch (BufferUnderflowException e) { + throw new EOFException(e); + } + } + + @Override + public int readUnsignedByte() throws IOException { + try { + return buffer.get() & 0xFF; + } catch (BufferUnderflowException e) { + throw new EOFException(e); + } + } + + @Override + public short readShort() throws IOException { + try { + return buffer.getShort(); + } catch (BufferUnderflowException e) { + throw new EOFException(e); + } + } + + @Override + public int readUnsignedShort() throws IOException { + try { + return buffer.getShort() & 65535; + } catch (BufferUnderflowException e) { + throw new EOFException(e); + } + } + + /* + Note: Unlike LittleEndianDataInputStream, which this replaces, using getInt and getLong on the ByteBuffer + can take advantage of intrinsics, which makes this faster. This has been confirmed by benchmarking. + */ + + @Override + public int readInt() throws IOException { + try { + return buffer.getInt(); + } catch (BufferUnderflowException e) { + throw new EOFException(e); + } + } + + @Override + public long readLong() throws IOException { + try { + return buffer.getLong(); + } catch (BufferUnderflowException e) { + throw new EOFException(e); + } + } }