From fcc2d75a156968eefe70d80a09ee96e4d9b8f2aa Mon Sep 17 00:00:00 2001 From: Tim Miller Date: Mon, 25 Apr 2022 20:50:35 +0000 Subject: [PATCH 01/19] Move all LittleEndianDataInputStream functionality into ByteBufferInputStream To improve performance, all multi-byte access functionality from LittleEndianDataInputStream has been merged into ByteBufferInputStream. LittleEndianDataInputStream is marked deprecated. --- .../parquet/bytes/ByteBufferInputStream.java | 156 ++++++++++++++- .../bytes/LittleEndianDataInputStream.java | 1 + .../parquet/bytes/MultiBufferInputStream.java | 189 +++++++++++++++++- .../bytes/SingleBufferInputStream.java | 153 +++++++++++++- pom.xml | 2 +- 5 files changed, 481 insertions(+), 20 deletions(-) diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java b/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java index 495cca2ec8..79ae378650 100644 --- a/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java @@ -28,6 +28,16 @@ import org.apache.parquet.ShouldNeverHappenException; +/* +Changes implemented: +All of the functionality of LittleEndianDataInputStream has been merged into ByteBufferInputStream and its child +classes. This has resulted in measurable performance improvements for the following reasons: +- Elimination of at least one layer of abstraction / method call overhead +- Enabling support for intrinsics for readInt, readLong, etc. +- Eliminate the need for the JIT to make inferences that may or may not inline methods from BytesUtils and + the InputStream.read() that is called by BytesUtils. + */ + public class ByteBufferInputStream extends InputStream { // Used to maintain the deprecated behavior of instantiating ByteBufferInputStream directly @@ -49,6 +59,19 @@ public static ByteBufferInputStream wrap(List buffers) { } } + public static ByteBufferInputStream wrap(ByteBuffer buffer, int offset, int count) { + return new SingleBufferInputStream(buffer, offset, count); + } + + public static ByteBufferInputStream wrap(byte[] buf) { + return new SingleBufferInputStream(buf); + } + + public static ByteBufferInputStream wrap(byte[] buf, int start, int length) { + return new SingleBufferInputStream(buf, start, length); + } + + ByteBufferInputStream() { delegate = null; } @@ -74,11 +97,26 @@ public ByteBufferInputStream(ByteBuffer buffer) { */ @Deprecated public ByteBufferInputStream(ByteBuffer buffer, int offset, int count) { + // This is necessary to pass "TestDeprecatedBufferInputStream"... ByteBuffer temp = buffer.duplicate(); temp.position(offset); ByteBuffer byteBuf = temp.slice(); byteBuf.limit(count); delegate = wrap(byteBuf); + // ... but it would probably be faster to do this: +// delegate = wrap(buffer, offset, count); + } + + public ByteBufferInputStream(byte[] inBuf) { + delegate = wrap(inBuf); + } + + public ByteBufferInputStream(byte[] inBuf, int start, int length) { + delegate = wrap(inBuf, start, length); + } + + public ByteBufferInputStream(List inBufs) { + delegate = wrap(inBufs); } /** @@ -98,12 +136,12 @@ public long position() { return delegate.position(); } + public void position(int pos) { + throw new UnsupportedOperationException(); + } + public void skipFully(long n) throws IOException { - long skipped = skip(n); - if (skipped < n) { - throw new EOFException( - "Not enough bytes to skip: " + skipped + " < " + n); - } + delegate.skipFully(n); } public int read(ByteBuffer out) { @@ -119,7 +157,8 @@ public List sliceBuffers(long length) throws EOFException { } public ByteBufferInputStream sliceStream(long length) throws EOFException { - return ByteBufferInputStream.wrap(sliceBuffers(length)); + return delegate.sliceStream(length); + //return ByteBufferInputStream.wrap(sliceBuffers(length)); } public List remainingBuffers() { @@ -127,7 +166,11 @@ public List remainingBuffers() { } public ByteBufferInputStream remainingStream() { - return ByteBufferInputStream.wrap(remainingBuffers()); + return delegate.remainingStream(); + } + + public ByteBufferInputStream duplicate() { + return delegate.duplicate(); } public int read() throws IOException { @@ -138,14 +181,34 @@ public int read(byte[] b, int off, int len) throws IOException { return delegate.read(b, off, len); } + public int read(byte[] b) throws IOException { + return read(b, 0, b.length); + } + + public void readFully(byte b[]) throws IOException { + delegate.readFully(b, 0, b.length); + } + + public void readFully(byte b[], int off, int len) throws IOException { + delegate.readFully(b, off, len); + } + public long skip(long n) { return delegate.skip(n); } + public int skipBytes(int n) { + return (int)skip(n); + } + public int available() { return delegate.available(); } + public int remaining() { + return available(); + } + public void mark(int readlimit) { delegate.mark(readlimit); } @@ -157,4 +220,83 @@ public void reset() throws IOException { public boolean markSupported() { return delegate.markSupported(); } + + public void close() throws IOException { + } + + public boolean readBoolean() throws IOException { + return readByte() != 0; + } + + public byte readByte() throws IOException { + return delegate.readByte(); + } + + public int readUnsignedByte() throws IOException { + return delegate.readUnsignedByte(); + } + + public short readShort() throws IOException { + return delegate.readShort(); + } + + public int readUnsignedShort() throws IOException { + return delegate.readUnsignedShort(); + } + + public int readInt() throws IOException { + return delegate.readInt(); + } + + public long readLong() throws IOException { + return delegate.readLong(); + } + + public float readFloat() throws IOException { + return Float.intBitsToFloat(readInt()); + } + + public double readDouble() throws IOException { + return Double.longBitsToDouble(readLong()); + } + + public int readIntLittleEndianOnThreeBytes() throws IOException { + int ch1 = readUnsignedByte(); + int ch2 = readUnsignedByte(); + int ch3 = readUnsignedByte(); + return ((ch3 << 16) + (ch2 << 8) + (ch1 << 0)); + } + + public int readIntLittleEndianPaddedOnBitWidth(int bitWidth) + throws IOException { + + int bytesWidth = BytesUtils.paddedByteCountFromBits(bitWidth); + switch (bytesWidth) { + case 0: + return 0; + case 1: + return readUnsignedByte(); + case 2: + return readUnsignedShort(); + case 3: + return readIntLittleEndianOnThreeBytes(); + case 4: + return readInt(); + default: + throw new IOException( + String.format("Encountered bitWidth (%d) that requires more than 4 bytes", bitWidth)); + } + } + + public int readUnsignedVarInt() throws IOException { + int value = 0; + int i = 0; + int b; + while (((b = readUnsignedByte()) & 0x80) != 0) { + value |= (b & 0x7F) << i; + i += 7; + } + return value | (b << i); + } + } diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/LittleEndianDataInputStream.java b/parquet-common/src/main/java/org/apache/parquet/bytes/LittleEndianDataInputStream.java index 4338c2458e..e7ad26c408 100644 --- a/parquet-common/src/main/java/org/apache/parquet/bytes/LittleEndianDataInputStream.java +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/LittleEndianDataInputStream.java @@ -25,6 +25,7 @@ /** * Based on DataInputStream but little endian and without the String/char methods */ +@Deprecated public final class LittleEndianDataInputStream extends InputStream { private final InputStream in; diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/MultiBufferInputStream.java b/parquet-common/src/main/java/org/apache/parquet/bytes/MultiBufferInputStream.java index 34fa2505af..810bbba620 100644 --- a/parquet-common/src/main/java/org/apache/parquet/bytes/MultiBufferInputStream.java +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/MultiBufferInputStream.java @@ -19,6 +19,8 @@ package org.apache.parquet.bytes; +import org.apache.parquet.ShouldNeverHappenException; + import java.io.EOFException; import java.io.IOException; import java.nio.ByteBuffer; @@ -89,6 +91,15 @@ public long skip(long n) { return bytesSkipped; } + @Override + public void skipFully(long n) throws IOException { + if (current == null || n > length) { + throw new EOFException(); + } + + skip(n); + } + @Override public int read(ByteBuffer out) { int len = out.remaining(); @@ -193,6 +204,10 @@ public List sliceBuffers(long len) throws EOFException { return buffers; } + public ByteBufferInputStream sliceStream(long length) throws EOFException { + return ByteBufferInputStream.wrap(sliceBuffers(length)); + } + @Override public List remainingBuffers() { if (position >= length) { @@ -208,12 +223,24 @@ public List remainingBuffers() { } } + public ByteBufferInputStream remainingStream() { + return ByteBufferInputStream.wrap(remainingBuffers()); + } + + @Override + public ByteBufferInputStream duplicate() { + // Nothing ever uses this, and it's complicated to make a duplicate that doesn't cause side effects + // for the original. If this is ever necessary, we can go through the effort of adding this at that time. + throw new UnsupportedOperationException(); + } + @Override public int read(byte[] bytes, int off, int len) { if (len <= 0) { if (len < 0) { throw new IndexOutOfBoundsException("Read length must be greater than 0: " + len); } + return 0; } @@ -238,8 +265,31 @@ public int read(byte[] bytes, int off, int len) { } @Override - public int read(byte[] bytes) { - return read(bytes, 0, bytes.length); + public void readFully(byte[] bytes, int off, int len) throws IOException { + if (len <= 0) { + if (len < 0) { + throw new IndexOutOfBoundsException("Read length must be greater than 0: " + len); + } + + return; + } + + if (current == null || len > length) { + throw new EOFException(); + } + + int bytesRead = 0; + while (bytesRead < len) { + if (current.remaining() > 0) { + int bytesToRead = Math.min(len - bytesRead, current.remaining()); + current.get(bytes, off + bytesRead, bytesToRead); + bytesRead += bytesToRead; + this.position += bytesToRead; + } else if (!nextBuffer()) { + // there are no more buffers + throw new ShouldNeverHappenException(); + } + } } @Override @@ -248,13 +298,17 @@ public int read() throws IOException { throw new EOFException(); } + this.position += 1; while (true) { - if (current.remaining() > 0) { - this.position += 1; - return current.get() & 0xFF; // as unsigned - } else if (!nextBuffer()) { - // there are no more buffers - throw new EOFException(); + try { + return current.get() & 255; + } catch (Exception e) { + // It has been measured to be faster to rely on ByteBuffer throwing BufferUnderflowException to determine + // when we're run out of bytes in the current buffer. + if (!nextBuffer()) { + // there are no more buffers + throw new EOFException(); + } } } } @@ -274,6 +328,7 @@ public void mark(int readlimit) { if (mark >= 0) { discardMark(); } + this.mark = position; this.markLimit = mark + readlimit + 1; if (current != null) { @@ -313,6 +368,8 @@ private boolean nextBuffer() { } this.current = iterator.next().duplicate(); + // Have to put the buffer in little endian mode, because it defaults to big endian + this.current.order(java.nio.ByteOrder.LITTLE_ENDIAN); if (mark >= 0) { if (position < markLimit) { @@ -379,4 +436,120 @@ public void remove() { second.remove(); } } + + @Override + public byte readByte() throws IOException { + return (byte)readUnsignedByte(); + } + + @Override + public int readUnsignedByte() throws IOException { + if (current == null) { + throw new EOFException(); + } + + this.position += 1; + while (true) { + try { + return current.get() & 255; + } catch (Exception e) { + if (!nextBuffer()) { + // there are no more buffers + throw new EOFException(); + } + } + } + } + + /** + * When reading a short will cross a buffer boundary, read one byte at a time. + * @return a short value + * @throws IOException + */ + private int getShortSlow() throws IOException { + int c0 = readUnsignedByte(); + int c1 = readUnsignedByte(); + return ((c0 << 0) + (c1 << 8)); + } + + public short readShort() throws IOException { + if (current == null) { + throw new EOFException(); + } + + if (current.remaining() >= 2) { + // If the whole short can be read from the current buffer, use intrinsics + this.position += 2; + return current.getShort(); + } else { + // Otherwise get the short one byte at a time + return (short)getShortSlow(); + } + } + + public int readUnsignedShort() throws IOException { + return readShort() & 0xffff; + } + + /** + * When reading an int will cross a buffer boundary, read one byte at a time. + * @return an int value + * @throws IOException + */ + private int getIntSlow() throws IOException { + int c0 = readUnsignedByte(); + int c1 = readUnsignedByte(); + int c2 = readUnsignedByte(); + int c3 = readUnsignedByte(); + return ((c0 << 0) + (c1 << 8)) + ((c2 << 16) + (c3 << 24)); + } + + @Override + public int readInt() throws IOException { + if (current == null) { + throw new EOFException(); + } + + if (current.remaining() >= 4) { + // If the whole int can be read from the current buffer, use intrinsics + this.position += 4; + return current.getInt(); + } else { + // Otherwise get the int one byte at a time + return getIntSlow(); + } + } + + /** + * When reading a long will cross a buffer boundary, read one byte at a time. + * @return a long value + * @throws IOException + */ + private long getLongSlow() throws IOException { + long ch0 = (long)readUnsignedByte() << 0; + long ch1 = (long)readUnsignedByte() << 8; + long ch2 = (long)readUnsignedByte() << 16; + long ch3 = (long)readUnsignedByte() << 24; + long ch4 = (long)readUnsignedByte() << 32; + long ch5 = (long)readUnsignedByte() << 40; + long ch6 = (long)readUnsignedByte() << 48; + long ch7 = (long)readUnsignedByte() << 56; + return ((ch0 + ch1) + (ch2 + ch3)) + ((ch4 + ch5) + (ch6 + ch7)); + } + + @Override + public long readLong() throws IOException { + if (current == null) { + throw new EOFException(); + } + + if (current.remaining() >= 8) { + // If the whole short can be read from the current buffer, use intrinsics + this.position += 8; + return current.getLong(); + } else { + // Otherwise get the long one byte at a time + return getLongSlow(); + } + } } diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/SingleBufferInputStream.java b/parquet-common/src/main/java/org/apache/parquet/bytes/SingleBufferInputStream.java index 999d1bb4f6..0f2d50d84e 100644 --- a/parquet-common/src/main/java/org/apache/parquet/bytes/SingleBufferInputStream.java +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/SingleBufferInputStream.java @@ -18,6 +18,7 @@ */ package org.apache.parquet.bytes; +import java.nio.BufferUnderflowException; import java.io.EOFException; import java.io.IOException; import java.nio.ByteBuffer; @@ -38,6 +39,34 @@ class SingleBufferInputStream extends ByteBufferInputStream { // duplicate the buffer because its state will be modified this.buffer = buffer.duplicate(); this.startPosition = buffer.position(); + this.buffer.order(java.nio.ByteOrder.LITTLE_ENDIAN); + } + + SingleBufferInputStream(ByteBuffer buffer, int start, int length) { + // duplicate the buffer because its state will be modified + this.buffer = buffer.duplicate(); + this.startPosition = start; + this.buffer.position(start); + this.buffer.limit(start + length); + this.buffer.order(java.nio.ByteOrder.LITTLE_ENDIAN); + } + + SingleBufferInputStream(byte[] inBuf) { + this.buffer = ByteBuffer.wrap(inBuf); + this.buffer.order(java.nio.ByteOrder.LITTLE_ENDIAN); + this.startPosition = 0; + } + + SingleBufferInputStream(byte[] inBuf, int start, int length) { + this.buffer = ByteBuffer.wrap(inBuf, start, length); + this.buffer.order(java.nio.ByteOrder.LITTLE_ENDIAN); + // This seems to be consistent with HeapByteBuffer.wrap(), which leaves + // the internal "offset" at zero and sets the starting position at start. + this.startPosition = 0; + } + + SingleBufferInputStream(List inBufs) { + throw new UnsupportedOperationException(); } @Override @@ -46,12 +75,19 @@ public long position() { return buffer.position() - startPosition; } + /* + Note: For all read methods, if we read off the end of the ByteBuffer, BufferUnderflowException is thrown, which + we catch and turn into an EOFException. This is measured to be faster than explicitly checking if the ByteBuffer + has any remaining bytes. + */ + @Override public int read() throws IOException { - if (!buffer.hasRemaining()) { - throw new EOFException(); + try { + return buffer.get() & 0xFF; + } catch (BufferUnderflowException e) { + throw new EOFException(e.getMessage()); } - return buffer.get() & 0xFF; // as unsigned } @Override @@ -70,9 +106,22 @@ public int read(byte[] bytes, int offset, int length) throws IOException { return bytesToRead; } - + + @Override + public void readFully(byte[] bytes, int offset, int length) throws IOException { + try { + buffer.get(bytes, offset, length); + } catch (BufferUnderflowException|IndexOutOfBoundsException e) { + throw new EOFException(e.getMessage()); + } + } + @Override public long skip(long n) { + if (n < 0) { + throw new IllegalArgumentException(); + } + if (n == 0) { return 0; } @@ -88,6 +137,21 @@ public long skip(long n) { return bytesToSkip; } + @Override + public void skipFully(long n) throws IOException { + if (n < 0 || n > Integer.MAX_VALUE) { + throw new IllegalArgumentException(); + } + + try { + buffer.position(buffer.position() + (int)n); + } catch (IllegalArgumentException e) { + // Be sure to leave buffer in EOF state + buffer.position(buffer.limit()); + throw new EOFException(e.getMessage()); + } + } + @Override public int read(ByteBuffer out) { int bytesToCopy; @@ -150,6 +214,28 @@ public List remainingBuffers() { return Collections.singletonList(remaining); } + @Override + public ByteBufferInputStream remainingStream() { + // Constructor makes duplicate, so we don't have to explicitly make a duplicate here + ByteBufferInputStream remaining = new SingleBufferInputStream(buffer); + buffer.position(buffer.limit()); + return remaining; + } + + @Override + public ByteBufferInputStream sliceStream(long length) throws EOFException { + if (length > buffer.remaining()) throw new EOFException(); + + ByteBufferInputStream remaining = new SingleBufferInputStream(buffer, buffer.position(), (int)length); + buffer.position(buffer.position() + (int)length); + return remaining; + } + + @Override + public ByteBufferInputStream duplicate() { + return new SingleBufferInputStream(buffer); + } + @Override public void mark(int readlimit) { this.mark = buffer.position(); @@ -174,4 +260,63 @@ public boolean markSupported() { public int available() { return buffer.remaining(); } + + @Override + public byte readByte() throws IOException { + try { + return buffer.get(); + } catch (BufferUnderflowException e) { + throw new EOFException(e.getMessage()); + } + } + + @Override + public int readUnsignedByte() throws IOException { + try { + return buffer.get() & 0xFF; + } catch (BufferUnderflowException e) { + throw new EOFException(e.getMessage()); + } + } + + @Override + public short readShort() throws IOException { + try { + return buffer.getShort(); + } catch (BufferUnderflowException e) { + throw new EOFException(e.getMessage()); + } + } + + @Override + public int readUnsignedShort() throws IOException { + try { + return buffer.getShort() & 0xFFFF; + } catch (BufferUnderflowException e) { + throw new EOFException(e.getMessage()); + } + } + + /* + Note: Unlike LittleEndianDataInputStream, which this replaces, using getInt and getLong on the ByteBuffer + can take advantage of intrinsics, which makes this faster. This has been confirmed by benchmarking. + */ + + @Override + public int readInt() throws IOException { + try { + return buffer.getInt(); + } catch (BufferUnderflowException e) { + throw new EOFException(e.getMessage()); + } + } + + @Override + public long readLong() throws IOException { + try { + return buffer.getLong(); + } catch (BufferUnderflowException e) { + throw new EOFException(e.getMessage()); + } + } } diff --git a/pom.xml b/pom.xml index 0af023588a..3322576c7e 100644 --- a/pom.xml +++ b/pom.xml @@ -88,7 +88,7 @@ 0.16.0 h2 0.10.0 - 0.16.0 + 0.15.0 ${thrift.version} 8.4.2 0.9.33 From cbcb139ffa65b98239e9a539ee227fe601971cbb Mon Sep 17 00:00:00 2001 From: Tim Miller Date: Mon, 25 Apr 2022 20:54:24 +0000 Subject: [PATCH 02/19] Unhacked thrift version requirement in pom --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 3322576c7e..0af023588a 100644 --- a/pom.xml +++ b/pom.xml @@ -88,7 +88,7 @@ 0.16.0 h2 0.10.0 - 0.15.0 + 0.16.0 ${thrift.version} 8.4.2 0.9.33 From 8c6d66e573eb5903613eb9b59a12f244e263abce Mon Sep 17 00:00:00 2001 From: "Timothy N. Miller" <94654785+theosib-amazon@users.noreply.github.com> Date: Mon, 25 Apr 2022 18:58:59 -0400 Subject: [PATCH 03/19] Update MultiBufferInputStream.java Made exception catch more specific for read() and readByte(). --- .../org/apache/parquet/bytes/MultiBufferInputStream.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/MultiBufferInputStream.java b/parquet-common/src/main/java/org/apache/parquet/bytes/MultiBufferInputStream.java index 810bbba620..c4b000c40c 100644 --- a/parquet-common/src/main/java/org/apache/parquet/bytes/MultiBufferInputStream.java +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/MultiBufferInputStream.java @@ -29,6 +29,7 @@ import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; +import java.nio.BufferUnderflowException; class MultiBufferInputStream extends ByteBufferInputStream { private static final ByteBuffer EMPTY = ByteBuffer.allocate(0); @@ -301,8 +302,8 @@ public int read() throws IOException { this.position += 1; while (true) { try { - return current.get() & 255; - } catch (Exception e) { + return current.get() & 0xFF; + } catch (BufferUnderflowException e) { // It has been measured to be faster to rely on ByteBuffer throwing BufferUnderflowException to determine // when we're run out of bytes in the current buffer. if (!nextBuffer()) { @@ -451,8 +452,8 @@ public int readUnsignedByte() throws IOException { this.position += 1; while (true) { try { - return current.get() & 255; - } catch (Exception e) { + return current.get() & 0xFF; + } catch (BufferUnderflowException e) { if (!nextBuffer()) { // there are no more buffers throw new EOFException(); From 832c0094d430cbec8290ee9f74ece1fc6faaa0cb Mon Sep 17 00:00:00 2001 From: Tim Miller Date: Tue, 26 Apr 2022 21:38:15 +0000 Subject: [PATCH 04/19] Added tests for ByteBufferInputStream Also fixed bug discovered by that testing --- .../parquet/bytes/ByteBufferInputStream.java | 2 +- .../bytes/TestByteBufferInputStreams.java | 112 ++++++++++++++++++ 2 files changed, 113 insertions(+), 1 deletion(-) diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java b/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java index 79ae378650..5f7a3ef4cb 100644 --- a/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java @@ -186,7 +186,7 @@ public int read(byte[] b) throws IOException { } public void readFully(byte b[]) throws IOException { - delegate.readFully(b, 0, b.length); + readFully(b, 0, b.length); } public void readFully(byte b[], int off, int len) throws IOException { diff --git a/parquet-common/src/test/java/org/apache/parquet/bytes/TestByteBufferInputStreams.java b/parquet-common/src/test/java/org/apache/parquet/bytes/TestByteBufferInputStreams.java index 0dc565f0f3..f4d5ea1a96 100644 --- a/parquet-common/src/test/java/org/apache/parquet/bytes/TestByteBufferInputStreams.java +++ b/parquet-common/src/test/java/org/apache/parquet/bytes/TestByteBufferInputStreams.java @@ -53,6 +53,17 @@ public void testRead0() throws Exception { 0, stream.read(bytes)); } + @Test + public void testSkip0() throws Exception { + ByteBufferInputStream stream = newStream(); + + long bytesRead = stream.skip(100); + Assert.assertTrue("Should skip to end of stream", bytesRead < 100); + + Assert.assertEquals("Should skip 0 bytes at end of stream", + 0, stream.skip(0)); + } + @Test public void testReadAll() throws Exception { byte[] bytes = new byte[35]; @@ -80,6 +91,31 @@ public void testReadAll() throws Exception { checkOriginalData(); } + @Test + public void testReadFully() throws Exception { + byte[] bytes = new byte[35]; + + ByteBufferInputStream stream = newStream(); + + stream.readFully(bytes); + + for (int i = 0; i < bytes.length; i += 1) { + Assert.assertEquals("Byte i should be i", i, bytes[i]); + Assert.assertEquals("Should advance position", 35, stream.position()); + } + + Assert.assertEquals("Should have no more remaining content", + 0, stream.available()); + + Assert.assertEquals("Should return -1 at end of stream", + -1, stream.read(bytes)); + + Assert.assertEquals("Should have no more remaining content", + 0, stream.available()); + + checkOriginalData(); + } + @Test public void testSmallReads() throws Exception { for (int size = 1; size < 36; size += 1) { @@ -181,6 +217,82 @@ public void testReadByte() throws Exception { checkOriginalData(); } + @Test + public void testReadShort() throws Exception { + final ByteBufferInputStream stream = newStream(); + int length = stream.available(); + int length2 = length & -2; // Only read a whole multiple of 2 bytes + int residual = length & 1; + + for (int i = 0; i < length2; i += 2) { + Assert.assertEquals("Position should increment", i, stream.position()); + short buffer_value = stream.readShort(); + short expected_value = (short)(i + ((i+1)<<8)); + Assert.assertEquals(expected_value, buffer_value); + } + + // Read the rest of the bytes + for (int i = 0; i < residual; i += 1) { + stream.read(); + } + + assertThrows("Should throw EOFException at end of stream", + EOFException.class, (Callable) stream::read); + + checkOriginalData(); + } + + @Test + public void testReadInt() throws Exception { + final ByteBufferInputStream stream = newStream(); + int length = stream.available(); + int length2 = length & -4; // Only read a whole multiple of 4 bytes + int residual = length & 3; + + for (int i = 0; i < length2; i += 4) { + Assert.assertEquals("Position should increment", i, stream.position()); + int buffer_value = stream.readInt(); + int expected_value = i + ((i+1)<<8) + ((i+2)<<16) + ((i+3)<<24); + Assert.assertEquals(expected_value, buffer_value); + } + + // Read the rest of the bytes + for (int i = 0; i < residual; i += 1) { + stream.read(); + } + + assertThrows("Should throw EOFException at end of stream", + EOFException.class, (Callable) stream::read); + + checkOriginalData(); + } + + @Test + public void testReadLong() throws Exception { + final ByteBufferInputStream stream = newStream(); + int length = stream.available(); + int length2 = length & -8; // Only read a whole multiple of 8 bytes + int residual = length & 7; + + for (long i = 0; i < length2; i += 8) { + Assert.assertEquals("Position should increment", i, stream.position()); + long buffer_value = stream.readLong(); + long expected_value = i + ((i+1)<<8) + ((i+2)<<16) + ((i+3)<<24) + + ((i+4)<<32) + ((i+5)<<40) + ((i+6)<<48) + ((i+7)<<56); + Assert.assertEquals(expected_value, buffer_value); + } + + // Read the rest of the bytes + for (int i = 0; i < residual; i += 1) { + stream.read(); + } + + assertThrows("Should throw EOFException at end of stream", + EOFException.class, (Callable) stream::read); + + checkOriginalData(); + } + @Test public void testSlice() throws Exception { ByteBufferInputStream stream = newStream(); From 0dff9a63f4c09cdd0c44166a1bcb4630c8acfdac Mon Sep 17 00:00:00 2001 From: "Timothy N. Miller" <94654785+theosib-amazon@users.noreply.github.com> Date: Wed, 27 Apr 2022 15:24:27 -0400 Subject: [PATCH 05/19] Update ByteBufferInputStream.java Removed unnecessary comment Removed unused wrapper method --- .../parquet/bytes/ByteBufferInputStream.java | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java b/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java index 5f7a3ef4cb..43f69ce57c 100644 --- a/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java @@ -28,16 +28,6 @@ import org.apache.parquet.ShouldNeverHappenException; -/* -Changes implemented: -All of the functionality of LittleEndianDataInputStream has been merged into ByteBufferInputStream and its child -classes. This has resulted in measurable performance improvements for the following reasons: -- Elimination of at least one layer of abstraction / method call overhead -- Enabling support for intrinsics for readInt, readLong, etc. -- Eliminate the need for the JIT to make inferences that may or may not inline methods from BytesUtils and - the InputStream.read() that is called by BytesUtils. - */ - public class ByteBufferInputStream extends InputStream { // Used to maintain the deprecated behavior of instantiating ByteBufferInputStream directly @@ -59,10 +49,6 @@ public static ByteBufferInputStream wrap(List buffers) { } } - public static ByteBufferInputStream wrap(ByteBuffer buffer, int offset, int count) { - return new SingleBufferInputStream(buffer, offset, count); - } - public static ByteBufferInputStream wrap(byte[] buf) { return new SingleBufferInputStream(buf); } From cf60c0248a63e2741cf577df29e4baa7844c8c3c Mon Sep 17 00:00:00 2001 From: "Timothy N. Miller" <94654785+theosib-amazon@users.noreply.github.com> Date: Wed, 27 Apr 2022 15:38:19 -0400 Subject: [PATCH 06/19] Update ByteBufferInputStream.java Removed unused wrappers and constructors --- .../parquet/bytes/ByteBufferInputStream.java | 24 ------------------- 1 file changed, 24 deletions(-) diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java b/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java index 43f69ce57c..072d15f6e0 100644 --- a/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java @@ -49,15 +49,6 @@ public static ByteBufferInputStream wrap(List buffers) { } } - public static ByteBufferInputStream wrap(byte[] buf) { - return new SingleBufferInputStream(buf); - } - - public static ByteBufferInputStream wrap(byte[] buf, int start, int length) { - return new SingleBufferInputStream(buf, start, length); - } - - ByteBufferInputStream() { delegate = null; } @@ -83,26 +74,11 @@ public ByteBufferInputStream(ByteBuffer buffer) { */ @Deprecated public ByteBufferInputStream(ByteBuffer buffer, int offset, int count) { - // This is necessary to pass "TestDeprecatedBufferInputStream"... ByteBuffer temp = buffer.duplicate(); temp.position(offset); ByteBuffer byteBuf = temp.slice(); byteBuf.limit(count); delegate = wrap(byteBuf); - // ... but it would probably be faster to do this: -// delegate = wrap(buffer, offset, count); - } - - public ByteBufferInputStream(byte[] inBuf) { - delegate = wrap(inBuf); - } - - public ByteBufferInputStream(byte[] inBuf, int start, int length) { - delegate = wrap(inBuf, start, length); - } - - public ByteBufferInputStream(List inBufs) { - delegate = wrap(inBufs); } /** From 4ed532c1408fa1bfbad54c5085b4420e8cba7b03 Mon Sep 17 00:00:00 2001 From: "Timothy N. Miller" <94654785+theosib-amazon@users.noreply.github.com> Date: Wed, 27 Apr 2022 15:51:34 -0400 Subject: [PATCH 07/19] Update ByteBufferInputStream.java Removed line of code that was commented out instead of properly deleted --- .../java/org/apache/parquet/bytes/ByteBufferInputStream.java | 1 - 1 file changed, 1 deletion(-) diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java b/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java index 072d15f6e0..409684c32f 100644 --- a/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java @@ -120,7 +120,6 @@ public List sliceBuffers(long length) throws EOFException { public ByteBufferInputStream sliceStream(long length) throws EOFException { return delegate.sliceStream(length); - //return ByteBufferInputStream.wrap(sliceBuffers(length)); } public List remainingBuffers() { From 927050183bf914291f9460af86c00439ab7a3f8f Mon Sep 17 00:00:00 2001 From: "Timothy N. Miller" <94654785+theosib-amazon@users.noreply.github.com> Date: Wed, 27 Apr 2022 15:55:47 -0400 Subject: [PATCH 08/19] Update SingleBufferInputStream.java Cleaned up some comments --- .../apache/parquet/bytes/SingleBufferInputStream.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/SingleBufferInputStream.java b/parquet-common/src/main/java/org/apache/parquet/bytes/SingleBufferInputStream.java index 0f2d50d84e..a82348fb60 100644 --- a/parquet-common/src/main/java/org/apache/parquet/bytes/SingleBufferInputStream.java +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/SingleBufferInputStream.java @@ -76,11 +76,10 @@ public long position() { } /* - Note: For all read methods, if we read off the end of the ByteBuffer, BufferUnderflowException is thrown, which + For all read methods, if we read off the end of the ByteBuffer, BufferUnderflowException is thrown, which we catch and turn into an EOFException. This is measured to be faster than explicitly checking if the ByteBuffer has any remaining bytes. */ - @Override public int read() throws IOException { try { @@ -298,10 +297,8 @@ public int readUnsignedShort() throws IOException { } /* - Note: Unlike LittleEndianDataInputStream, which this replaces, using getInt and getLong on the ByteBuffer - can take advantage of intrinsics, which makes this faster. This has been confirmed by benchmarking. + Use ByteBuffer.getInt(), which takes advantage of platform intrinsics */ - @Override public int readInt() throws IOException { try { @@ -311,6 +308,9 @@ public int readInt() throws IOException { } } + /* + Use ByteBuffer.getLonmg(), which takes advantage of platform intrinsics + */ @Override public long readLong() throws IOException { try { From a793ee3b1ecd018fd3a68b92895cdd7bb9cb4467 Mon Sep 17 00:00:00 2001 From: "Timothy N. Miller" <94654785+theosib-amazon@users.noreply.github.com> Date: Wed, 27 Apr 2022 16:38:03 -0400 Subject: [PATCH 09/19] Update ByteBufferInputStream.java Removed some more unused methods --- .../parquet/bytes/ByteBufferInputStream.java | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java b/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java index 409684c32f..839d11c1b3 100644 --- a/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java @@ -98,10 +98,6 @@ public long position() { return delegate.position(); } - public void position(int pos) { - throw new UnsupportedOperationException(); - } - public void skipFully(long n) throws IOException { delegate.skipFully(n); } @@ -130,10 +126,6 @@ public ByteBufferInputStream remainingStream() { return delegate.remainingStream(); } - public ByteBufferInputStream duplicate() { - return delegate.duplicate(); - } - public int read() throws IOException { return delegate.read(); } @@ -158,18 +150,10 @@ public long skip(long n) { return delegate.skip(n); } - public int skipBytes(int n) { - return (int)skip(n); - } - public int available() { return delegate.available(); } - public int remaining() { - return available(); - } - public void mark(int readlimit) { delegate.mark(readlimit); } @@ -182,9 +166,6 @@ public boolean markSupported() { return delegate.markSupported(); } - public void close() throws IOException { - } - public boolean readBoolean() throws IOException { return readByte() != 0; } From 2adac332357a5d5720ad28908d27439b6927288d Mon Sep 17 00:00:00 2001 From: "Timothy N. Miller" <94654785+theosib-amazon@users.noreply.github.com> Date: Wed, 27 Apr 2022 16:38:48 -0400 Subject: [PATCH 10/19] Update LittleEndianDataInputStream.java @Deprecated postpones to future PR --- .../org/apache/parquet/bytes/LittleEndianDataInputStream.java | 1 - 1 file changed, 1 deletion(-) diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/LittleEndianDataInputStream.java b/parquet-common/src/main/java/org/apache/parquet/bytes/LittleEndianDataInputStream.java index e7ad26c408..4338c2458e 100644 --- a/parquet-common/src/main/java/org/apache/parquet/bytes/LittleEndianDataInputStream.java +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/LittleEndianDataInputStream.java @@ -25,7 +25,6 @@ /** * Based on DataInputStream but little endian and without the String/char methods */ -@Deprecated public final class LittleEndianDataInputStream extends InputStream { private final InputStream in; From 491dd6c9bcd17e726e28e19e9220454af3963f1d Mon Sep 17 00:00:00 2001 From: "Timothy N. Miller" <94654785+theosib-amazon@users.noreply.github.com> Date: Wed, 27 Apr 2022 16:42:06 -0400 Subject: [PATCH 11/19] Update MultiBufferInputStream.java Removed unnecessary methods. Reverted whitespace change. --- .../org/apache/parquet/bytes/MultiBufferInputStream.java | 9 --------- 1 file changed, 9 deletions(-) diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/MultiBufferInputStream.java b/parquet-common/src/main/java/org/apache/parquet/bytes/MultiBufferInputStream.java index c4b000c40c..3679a892b6 100644 --- a/parquet-common/src/main/java/org/apache/parquet/bytes/MultiBufferInputStream.java +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/MultiBufferInputStream.java @@ -20,7 +20,6 @@ package org.apache.parquet.bytes; import org.apache.parquet.ShouldNeverHappenException; - import java.io.EOFException; import java.io.IOException; import java.nio.ByteBuffer; @@ -228,13 +227,6 @@ public ByteBufferInputStream remainingStream() { return ByteBufferInputStream.wrap(remainingBuffers()); } - @Override - public ByteBufferInputStream duplicate() { - // Nothing ever uses this, and it's complicated to make a duplicate that doesn't cause side effects - // for the original. If this is ever necessary, we can go through the effort of adding this at that time. - throw new UnsupportedOperationException(); - } - @Override public int read(byte[] bytes, int off, int len) { if (len <= 0) { @@ -329,7 +321,6 @@ public void mark(int readlimit) { if (mark >= 0) { discardMark(); } - this.mark = position; this.markLimit = mark + readlimit + 1; if (current != null) { From 20420851b6772662804882719543a911fdc74a4a Mon Sep 17 00:00:00 2001 From: "Timothy N. Miller" <94654785+theosib-amazon@users.noreply.github.com> Date: Wed, 27 Apr 2022 16:45:45 -0400 Subject: [PATCH 12/19] Update SingleBufferInputStream.java Removed unnecessary methods --- .../org/apache/parquet/bytes/SingleBufferInputStream.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/SingleBufferInputStream.java b/parquet-common/src/main/java/org/apache/parquet/bytes/SingleBufferInputStream.java index a82348fb60..f0d85785f1 100644 --- a/parquet-common/src/main/java/org/apache/parquet/bytes/SingleBufferInputStream.java +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/SingleBufferInputStream.java @@ -230,11 +230,6 @@ public ByteBufferInputStream sliceStream(long length) throws EOFException { return remaining; } - @Override - public ByteBufferInputStream duplicate() { - return new SingleBufferInputStream(buffer); - } - @Override public void mark(int readlimit) { this.mark = buffer.position(); From d59b1f9eebfd52c824bc58616f998e5cbbddf304 Mon Sep 17 00:00:00 2001 From: "Timothy N. Miller" <94654785+theosib-amazon@users.noreply.github.com> Date: Wed, 27 Apr 2022 16:51:10 -0400 Subject: [PATCH 13/19] Update MultiBufferInputStream.java Removed whitespace change --- .../java/org/apache/parquet/bytes/MultiBufferInputStream.java | 1 - 1 file changed, 1 deletion(-) diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/MultiBufferInputStream.java b/parquet-common/src/main/java/org/apache/parquet/bytes/MultiBufferInputStream.java index 3679a892b6..31568b5acd 100644 --- a/parquet-common/src/main/java/org/apache/parquet/bytes/MultiBufferInputStream.java +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/MultiBufferInputStream.java @@ -233,7 +233,6 @@ public int read(byte[] bytes, int off, int len) { if (len < 0) { throw new IndexOutOfBoundsException("Read length must be greater than 0: " + len); } - return 0; } From 8e8bde1546ca4ea383ee58df59b9b56cc5218c94 Mon Sep 17 00:00:00 2001 From: "Timothy N. Miller" <94654785+theosib-amazon@users.noreply.github.com> Date: Mon, 25 Jul 2022 11:30:17 -0400 Subject: [PATCH 14/19] Update MultiBufferInputStream.java Use constants for byte size of words --- .../apache/parquet/bytes/MultiBufferInputStream.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/MultiBufferInputStream.java b/parquet-common/src/main/java/org/apache/parquet/bytes/MultiBufferInputStream.java index 31568b5acd..de26135b6b 100644 --- a/parquet-common/src/main/java/org/apache/parquet/bytes/MultiBufferInputStream.java +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/MultiBufferInputStream.java @@ -468,9 +468,9 @@ public short readShort() throws IOException { throw new EOFException(); } - if (current.remaining() >= 2) { + if (current.remaining() >= Short.BYTES) { // If the whole short can be read from the current buffer, use intrinsics - this.position += 2; + this.position += Short.BYTES; return current.getShort(); } else { // Otherwise get the short one byte at a time @@ -501,9 +501,9 @@ public int readInt() throws IOException { throw new EOFException(); } - if (current.remaining() >= 4) { + if (current.remaining() >= Integer.BYTES) { // If the whole int can be read from the current buffer, use intrinsics - this.position += 4; + this.position += Integer.BYTES; return current.getInt(); } else { // Otherwise get the int one byte at a time @@ -534,9 +534,9 @@ public long readLong() throws IOException { throw new EOFException(); } - if (current.remaining() >= 8) { + if (current.remaining() >= Long.BYTES) { // If the whole short can be read from the current buffer, use intrinsics - this.position += 8; + this.position += Long.BYTES; return current.getLong(); } else { // Otherwise get the long one byte at a time From 736aedd22acb208f3c9ebf48eb07efb8c79007fb Mon Sep 17 00:00:00 2001 From: "Timothy N. Miller" <94654785+theosib-amazon@users.noreply.github.com> Date: Mon, 1 Aug 2022 11:36:14 -0400 Subject: [PATCH 15/19] Update SingleBufferInputStream.java Minor code changes on request. Removed redundant code. Fixed code formatting nits. More informative exceptions. Removed constructor that just throws exception. --- .../bytes/SingleBufferInputStream.java | 31 ++++++++----------- 1 file changed, 13 insertions(+), 18 deletions(-) diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/SingleBufferInputStream.java b/parquet-common/src/main/java/org/apache/parquet/bytes/SingleBufferInputStream.java index f0d85785f1..9a316e5cd3 100644 --- a/parquet-common/src/main/java/org/apache/parquet/bytes/SingleBufferInputStream.java +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/SingleBufferInputStream.java @@ -65,28 +65,15 @@ class SingleBufferInputStream extends ByteBufferInputStream { this.startPosition = 0; } - SingleBufferInputStream(List inBufs) { - throw new UnsupportedOperationException(); - } - @Override public long position() { // position is relative to the start of the stream, not the buffer return buffer.position() - startPosition; } - /* - For all read methods, if we read off the end of the ByteBuffer, BufferUnderflowException is thrown, which - we catch and turn into an EOFException. This is measured to be faster than explicitly checking if the ByteBuffer - has any remaining bytes. - */ @Override public int read() throws IOException { - try { - return buffer.get() & 0xFF; - } catch (BufferUnderflowException e) { - throw new EOFException(e.getMessage()); - } + return readUnsignedByte(); } @Override @@ -110,7 +97,7 @@ public int read(byte[] bytes, int offset, int length) throws IOException { public void readFully(byte[] bytes, int offset, int length) throws IOException { try { buffer.get(bytes, offset, length); - } catch (BufferUnderflowException|IndexOutOfBoundsException e) { + } catch (BufferUnderflowException | IndexOutOfBoundsException e) { throw new EOFException(e.getMessage()); } } @@ -118,7 +105,7 @@ public void readFully(byte[] bytes, int offset, int length) throws IOException { @Override public long skip(long n) { if (n < 0) { - throw new IllegalArgumentException(); + throw new IllegalArgumentException("Invalid input for skip: " + n); } if (n == 0) { @@ -139,7 +126,7 @@ public long skip(long n) { @Override public void skipFully(long n) throws IOException { if (n < 0 || n > Integer.MAX_VALUE) { - throw new IllegalArgumentException(); + throw new IllegalArgumentException("Invalid input for skipFully: " + n); } try { @@ -255,6 +242,11 @@ public int available() { return buffer.remaining(); } + /* + For all read methods, if we read off the end of the ByteBuffer, BufferUnderflowException is thrown, which + we catch and turn into an EOFException. This is measured to be faster than explicitly checking if the ByteBuffer + has any remaining bytes. + */ @Override public byte readByte() throws IOException { try { @@ -273,6 +265,9 @@ public int readUnsignedByte() throws IOException { } } + /* + Use ByteBuffer.getShort(), which takes advantage of platform intrinsics + */ @Override public short readShort() throws IOException { try { @@ -304,7 +299,7 @@ public int readInt() throws IOException { } /* - Use ByteBuffer.getLonmg(), which takes advantage of platform intrinsics + Use ByteBuffer.getLong(), which takes advantage of platform intrinsics */ @Override public long readLong() throws IOException { From 39cb3f39710edb382c844d8f5271d68252702b2b Mon Sep 17 00:00:00 2001 From: "Timothy N. Miller" <94654785+theosib-amazon@users.noreply.github.com> Date: Mon, 1 Aug 2022 11:39:19 -0400 Subject: [PATCH 16/19] Update MultiBufferInputStream.java Minor code changes on request. Removed redundant code. Fixed code formatting nits. More informative exceptions. --- .../parquet/bytes/MultiBufferInputStream.java | 40 ++++++------------- 1 file changed, 12 insertions(+), 28 deletions(-) diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/MultiBufferInputStream.java b/parquet-common/src/main/java/org/apache/parquet/bytes/MultiBufferInputStream.java index de26135b6b..1fda43aa57 100644 --- a/parquet-common/src/main/java/org/apache/parquet/bytes/MultiBufferInputStream.java +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/MultiBufferInputStream.java @@ -94,7 +94,7 @@ public long skip(long n) { @Override public void skipFully(long n) throws IOException { if (current == null || n > length) { - throw new EOFException(); + throw new EOFException("Not enough bytes to skip: " + length + " < " + n)); } skip(n); @@ -286,23 +286,7 @@ public void readFully(byte[] bytes, int off, int len) throws IOException { @Override public int read() throws IOException { - if (current == null) { - throw new EOFException(); - } - - this.position += 1; - while (true) { - try { - return current.get() & 0xFF; - } catch (BufferUnderflowException e) { - // It has been measured to be faster to rely on ByteBuffer throwing BufferUnderflowException to determine - // when we're run out of bytes in the current buffer. - if (!nextBuffer()) { - // there are no more buffers - throw new EOFException(); - } - } - } + return (byte) readUnsignedByte(); } @Override @@ -430,7 +414,7 @@ public void remove() { @Override public byte readByte() throws IOException { - return (byte)readUnsignedByte(); + return (byte) readUnsignedByte(); } @Override @@ -474,7 +458,7 @@ public short readShort() throws IOException { return current.getShort(); } else { // Otherwise get the short one byte at a time - return (short)getShortSlow(); + return (short) getShortSlow(); } } @@ -517,14 +501,14 @@ public int readInt() throws IOException { * @throws IOException */ private long getLongSlow() throws IOException { - long ch0 = (long)readUnsignedByte() << 0; - long ch1 = (long)readUnsignedByte() << 8; - long ch2 = (long)readUnsignedByte() << 16; - long ch3 = (long)readUnsignedByte() << 24; - long ch4 = (long)readUnsignedByte() << 32; - long ch5 = (long)readUnsignedByte() << 40; - long ch6 = (long)readUnsignedByte() << 48; - long ch7 = (long)readUnsignedByte() << 56; + long ch0 = (long) readUnsignedByte() << 0; + long ch1 = (long) readUnsignedByte() << 8; + long ch2 = (long) readUnsignedByte() << 16; + long ch3 = (long) readUnsignedByte() << 24; + long ch4 = (long) readUnsignedByte() << 32; + long ch5 = (long) readUnsignedByte() << 40; + long ch6 = (long) readUnsignedByte() << 48; + long ch7 = (long) readUnsignedByte() << 56; return ((ch0 + ch1) + (ch2 + ch3)) + ((ch4 + ch5) + (ch6 + ch7)); } From a3da888267f1d35540531f18429c4604f493f122 Mon Sep 17 00:00:00 2001 From: Timothy Miller Date: Wed, 17 Aug 2022 10:37:18 -0400 Subject: [PATCH 17/19] Added tests for two as-yet unused constructors for SingleBufferInputStream. Rather than deleting these, I decided to keep the constructs as documentation on how to do these things according to how HeapByteBuffer wraps arrays. And it would be best if we didn't have to suffer the overhead of creating a ByteBuffer first in order to create a SingleBufferInputStream. --- ...BufferInputStreamByteArrayConstructor.java | 36 +++++++++++++++++++ ...treamByteArrayConstructorOffsetLength.java | 36 +++++++++++++++++++ 2 files changed, 72 insertions(+) create mode 100644 parquet-common/src/test/java/org/apache/parquet/bytes/TestSingleBufferInputStreamByteArrayConstructor.java create mode 100644 parquet-common/src/test/java/org/apache/parquet/bytes/TestSingleBufferInputStreamByteArrayConstructorOffsetLength.java diff --git a/parquet-common/src/test/java/org/apache/parquet/bytes/TestSingleBufferInputStreamByteArrayConstructor.java b/parquet-common/src/test/java/org/apache/parquet/bytes/TestSingleBufferInputStreamByteArrayConstructor.java new file mode 100644 index 0000000000..beaf7c908c --- /dev/null +++ b/parquet-common/src/test/java/org/apache/parquet/bytes/TestSingleBufferInputStreamByteArrayConstructor.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.bytes; + +public class TestSingleBufferInputStreamByteArrayConstructor extends TestByteBufferInputStreams { + static final byte[] DATA = new byte[] { + 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, + 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34 }; + + @Override + protected ByteBufferInputStream newStream() { + return new SingleBufferInputStream(DATA); + } + + @Override + protected void checkOriginalData() { + // Nothing to do + } +} diff --git a/parquet-common/src/test/java/org/apache/parquet/bytes/TestSingleBufferInputStreamByteArrayConstructorOffsetLength.java b/parquet-common/src/test/java/org/apache/parquet/bytes/TestSingleBufferInputStreamByteArrayConstructorOffsetLength.java new file mode 100644 index 0000000000..b7cf52fa37 --- /dev/null +++ b/parquet-common/src/test/java/org/apache/parquet/bytes/TestSingleBufferInputStreamByteArrayConstructorOffsetLength.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.bytes; + +public class TestSingleBufferInputStreamByteArrayConstructorOffsetLength extends TestByteBufferInputStreams { + static final byte[] DATA = new byte[] { + (byte) 0xff, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, + 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, (byte) 0xfe }; + + @Override + protected ByteBufferInputStream newStream() { + return new SingleBufferInputStream(DATA, 1, 35); + } + + @Override + protected void checkOriginalData() { + // Nothing to do + } +} From d94ad68590018e53cfb73e836292906c5285f174 Mon Sep 17 00:00:00 2001 From: Timothy Miller Date: Wed, 17 Aug 2022 16:52:02 -0400 Subject: [PATCH 18/19] Fixed compile and test failures --- .../parquet/bytes/MultiBufferInputStream.java | 4 ++-- .../parquet/bytes/SingleBufferInputStream.java | 14 ++++++++------ 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/MultiBufferInputStream.java b/parquet-common/src/main/java/org/apache/parquet/bytes/MultiBufferInputStream.java index 1fda43aa57..0901e99006 100644 --- a/parquet-common/src/main/java/org/apache/parquet/bytes/MultiBufferInputStream.java +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/MultiBufferInputStream.java @@ -94,7 +94,7 @@ public long skip(long n) { @Override public void skipFully(long n) throws IOException { if (current == null || n > length) { - throw new EOFException("Not enough bytes to skip: " + length + " < " + n)); + throw new EOFException("Not enough bytes to skip: " + length + " < " + n); } skip(n); @@ -286,7 +286,7 @@ public void readFully(byte[] bytes, int off, int len) throws IOException { @Override public int read() throws IOException { - return (byte) readUnsignedByte(); + return readUnsignedByte(); } @Override diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/SingleBufferInputStream.java b/parquet-common/src/main/java/org/apache/parquet/bytes/SingleBufferInputStream.java index 9a316e5cd3..5733495a77 100644 --- a/parquet-common/src/main/java/org/apache/parquet/bytes/SingleBufferInputStream.java +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/SingleBufferInputStream.java @@ -58,10 +58,12 @@ class SingleBufferInputStream extends ByteBufferInputStream { } SingleBufferInputStream(byte[] inBuf, int start, int length) { - this.buffer = ByteBuffer.wrap(inBuf, start, length); + // Without slice() here, TestByteBufferInputStreams.testSliceBuffersModification + // for TestSingleBufferInputStreamByteArrayConstructorOffsetLength + // has a failure because it dubiously assumes that the ByteBuffer's limits + // are the same as the backing array. + this.buffer = ByteBuffer.wrap(inBuf, start, length).slice(); this.buffer.order(java.nio.ByteOrder.LITTLE_ENDIAN); - // This seems to be consistent with HeapByteBuffer.wrap(), which leaves - // the internal "offset" at zero and sets the starting position at start. this.startPosition = 0; } @@ -166,10 +168,10 @@ public ByteBuffer slice(int length) throws EOFException { throw new EOFException(); } - // length is less than remaining, so it must fit in an int ByteBuffer copy = buffer.duplicate(); - copy.limit(copy.position() + length); - buffer.position(buffer.position() + length); + int en = buffer.position() + length; + copy.limit(en); + buffer.position(en); return copy; } From b7049bc2aa6191a0efcc99fc606e54872c7b9ee4 Mon Sep 17 00:00:00 2001 From: "Timothy N. Miller" <94654785+theosib-amazon@users.noreply.github.com> Date: Wed, 7 Sep 2022 12:37:16 -0400 Subject: [PATCH 19/19] Update ByteBufferInputStream.java Changed byte b[] to byte[] b --- .../java/org/apache/parquet/bytes/ByteBufferInputStream.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java b/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java index 839d11c1b3..be5e210f7c 100644 --- a/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java @@ -138,7 +138,7 @@ public int read(byte[] b) throws IOException { return read(b, 0, b.length); } - public void readFully(byte b[]) throws IOException { + public void readFully(byte[] b) throws IOException { readFully(b, 0, b.length); }