diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java b/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java index 495cca2ec8..be5e210f7c 100644 --- a/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java @@ -99,11 +99,7 @@ public long position() { } public void skipFully(long n) throws IOException { - long skipped = skip(n); - if (skipped < n) { - throw new EOFException( - "Not enough bytes to skip: " + skipped + " < " + n); - } + delegate.skipFully(n); } public int read(ByteBuffer out) { @@ -119,7 +115,7 @@ public List sliceBuffers(long length) throws EOFException { } public ByteBufferInputStream sliceStream(long length) throws EOFException { - return ByteBufferInputStream.wrap(sliceBuffers(length)); + return delegate.sliceStream(length); } public List remainingBuffers() { @@ -127,7 +123,7 @@ public List remainingBuffers() { } public ByteBufferInputStream remainingStream() { - return ByteBufferInputStream.wrap(remainingBuffers()); + return delegate.remainingStream(); } public int read() throws IOException { @@ -138,6 +134,18 @@ public int read(byte[] b, int off, int len) throws IOException { return delegate.read(b, off, len); } + public int read(byte[] b) throws IOException { + return read(b, 0, b.length); + } + + public void readFully(byte[] b) throws IOException { + readFully(b, 0, b.length); + } + + public void readFully(byte b[], int off, int len) throws IOException { + delegate.readFully(b, off, len); + } + public long skip(long n) { return delegate.skip(n); } @@ -157,4 +165,80 @@ public void reset() throws IOException { public boolean markSupported() { return delegate.markSupported(); } + + public boolean readBoolean() throws IOException { + return readByte() != 0; + } + + public byte readByte() throws IOException { + return delegate.readByte(); + } + + public int readUnsignedByte() throws IOException { + return delegate.readUnsignedByte(); + } + + public short readShort() throws IOException { + return delegate.readShort(); + } + + public int readUnsignedShort() throws IOException { + return delegate.readUnsignedShort(); + } + + public int readInt() throws IOException { + return delegate.readInt(); + } + + public long readLong() throws IOException { + return delegate.readLong(); + } + + public float readFloat() throws IOException { + return Float.intBitsToFloat(readInt()); + } + + public double readDouble() throws IOException { + return Double.longBitsToDouble(readLong()); + } + + public int readIntLittleEndianOnThreeBytes() throws IOException { + int ch1 = readUnsignedByte(); + int ch2 = readUnsignedByte(); + int ch3 = readUnsignedByte(); + return ((ch3 << 16) + (ch2 << 8) + (ch1 << 0)); + } + + public int readIntLittleEndianPaddedOnBitWidth(int bitWidth) + throws IOException { + + int bytesWidth = BytesUtils.paddedByteCountFromBits(bitWidth); + switch (bytesWidth) { + case 0: + return 0; + case 1: + return readUnsignedByte(); + case 2: + return readUnsignedShort(); + case 3: + return readIntLittleEndianOnThreeBytes(); + case 4: + return readInt(); + default: + throw new IOException( + String.format("Encountered bitWidth (%d) that requires more than 4 bytes", bitWidth)); + } + } + + public int readUnsignedVarInt() throws IOException { + int value = 0; + int i = 0; + int b; + while (((b = readUnsignedByte()) & 0x80) != 0) { + value |= (b & 0x7F) << i; + i += 7; + } + return value | (b << i); + } + } diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/MultiBufferInputStream.java b/parquet-common/src/main/java/org/apache/parquet/bytes/MultiBufferInputStream.java index 34fa2505af..0901e99006 100644 --- a/parquet-common/src/main/java/org/apache/parquet/bytes/MultiBufferInputStream.java +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/MultiBufferInputStream.java @@ -19,6 +19,7 @@ package org.apache.parquet.bytes; +import org.apache.parquet.ShouldNeverHappenException; import java.io.EOFException; import java.io.IOException; import java.nio.ByteBuffer; @@ -27,6 +28,7 @@ import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; +import java.nio.BufferUnderflowException; class MultiBufferInputStream extends ByteBufferInputStream { private static final ByteBuffer EMPTY = ByteBuffer.allocate(0); @@ -89,6 +91,15 @@ public long skip(long n) { return bytesSkipped; } + @Override + public void skipFully(long n) throws IOException { + if (current == null || n > length) { + throw new EOFException("Not enough bytes to skip: " + length + " < " + n); + } + + skip(n); + } + @Override public int read(ByteBuffer out) { int len = out.remaining(); @@ -193,6 +204,10 @@ public List sliceBuffers(long len) throws EOFException { return buffers; } + public ByteBufferInputStream sliceStream(long length) throws EOFException { + return ByteBufferInputStream.wrap(sliceBuffers(length)); + } + @Override public List remainingBuffers() { if (position >= length) { @@ -208,6 +223,10 @@ public List remainingBuffers() { } } + public ByteBufferInputStream remainingStream() { + return ByteBufferInputStream.wrap(remainingBuffers()); + } + @Override public int read(byte[] bytes, int off, int len) { if (len <= 0) { @@ -238,27 +257,38 @@ public int read(byte[] bytes, int off, int len) { } @Override - public int read(byte[] bytes) { - return read(bytes, 0, bytes.length); - } + public void readFully(byte[] bytes, int off, int len) throws IOException { + if (len <= 0) { + if (len < 0) { + throw new IndexOutOfBoundsException("Read length must be greater than 0: " + len); + } + + return; + } - @Override - public int read() throws IOException { - if (current == null) { + if (current == null || len > length) { throw new EOFException(); } - while (true) { + int bytesRead = 0; + while (bytesRead < len) { if (current.remaining() > 0) { - this.position += 1; - return current.get() & 0xFF; // as unsigned + int bytesToRead = Math.min(len - bytesRead, current.remaining()); + current.get(bytes, off + bytesRead, bytesToRead); + bytesRead += bytesToRead; + this.position += bytesToRead; } else if (!nextBuffer()) { // there are no more buffers - throw new EOFException(); + throw new ShouldNeverHappenException(); } } } + @Override + public int read() throws IOException { + return readUnsignedByte(); + } + @Override public int available() { long remaining = length - position; @@ -313,6 +343,8 @@ private boolean nextBuffer() { } this.current = iterator.next().duplicate(); + // Have to put the buffer in little endian mode, because it defaults to big endian + this.current.order(java.nio.ByteOrder.LITTLE_ENDIAN); if (mark >= 0) { if (position < markLimit) { @@ -379,4 +411,120 @@ public void remove() { second.remove(); } } + + @Override + public byte readByte() throws IOException { + return (byte) readUnsignedByte(); + } + + @Override + public int readUnsignedByte() throws IOException { + if (current == null) { + throw new EOFException(); + } + + this.position += 1; + while (true) { + try { + return current.get() & 0xFF; + } catch (BufferUnderflowException e) { + if (!nextBuffer()) { + // there are no more buffers + throw new EOFException(); + } + } + } + } + + /** + * When reading a short will cross a buffer boundary, read one byte at a time. + * @return a short value + * @throws IOException + */ + private int getShortSlow() throws IOException { + int c0 = readUnsignedByte(); + int c1 = readUnsignedByte(); + return ((c0 << 0) + (c1 << 8)); + } + + public short readShort() throws IOException { + if (current == null) { + throw new EOFException(); + } + + if (current.remaining() >= Short.BYTES) { + // If the whole short can be read from the current buffer, use intrinsics + this.position += Short.BYTES; + return current.getShort(); + } else { + // Otherwise get the short one byte at a time + return (short) getShortSlow(); + } + } + + public int readUnsignedShort() throws IOException { + return readShort() & 0xffff; + } + + /** + * When reading an int will cross a buffer boundary, read one byte at a time. + * @return an int value + * @throws IOException + */ + private int getIntSlow() throws IOException { + int c0 = readUnsignedByte(); + int c1 = readUnsignedByte(); + int c2 = readUnsignedByte(); + int c3 = readUnsignedByte(); + return ((c0 << 0) + (c1 << 8)) + ((c2 << 16) + (c3 << 24)); + } + + @Override + public int readInt() throws IOException { + if (current == null) { + throw new EOFException(); + } + + if (current.remaining() >= Integer.BYTES) { + // If the whole int can be read from the current buffer, use intrinsics + this.position += Integer.BYTES; + return current.getInt(); + } else { + // Otherwise get the int one byte at a time + return getIntSlow(); + } + } + + /** + * When reading a long will cross a buffer boundary, read one byte at a time. + * @return a long value + * @throws IOException + */ + private long getLongSlow() throws IOException { + long ch0 = (long) readUnsignedByte() << 0; + long ch1 = (long) readUnsignedByte() << 8; + long ch2 = (long) readUnsignedByte() << 16; + long ch3 = (long) readUnsignedByte() << 24; + long ch4 = (long) readUnsignedByte() << 32; + long ch5 = (long) readUnsignedByte() << 40; + long ch6 = (long) readUnsignedByte() << 48; + long ch7 = (long) readUnsignedByte() << 56; + return ((ch0 + ch1) + (ch2 + ch3)) + ((ch4 + ch5) + (ch6 + ch7)); + } + + @Override + public long readLong() throws IOException { + if (current == null) { + throw new EOFException(); + } + + if (current.remaining() >= Long.BYTES) { + // If the whole short can be read from the current buffer, use intrinsics + this.position += Long.BYTES; + return current.getLong(); + } else { + // Otherwise get the long one byte at a time + return getLongSlow(); + } + } } diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/SingleBufferInputStream.java b/parquet-common/src/main/java/org/apache/parquet/bytes/SingleBufferInputStream.java index 999d1bb4f6..5733495a77 100644 --- a/parquet-common/src/main/java/org/apache/parquet/bytes/SingleBufferInputStream.java +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/SingleBufferInputStream.java @@ -18,6 +18,7 @@ */ package org.apache.parquet.bytes; +import java.nio.BufferUnderflowException; import java.io.EOFException; import java.io.IOException; import java.nio.ByteBuffer; @@ -38,6 +39,32 @@ class SingleBufferInputStream extends ByteBufferInputStream { // duplicate the buffer because its state will be modified this.buffer = buffer.duplicate(); this.startPosition = buffer.position(); + this.buffer.order(java.nio.ByteOrder.LITTLE_ENDIAN); + } + + SingleBufferInputStream(ByteBuffer buffer, int start, int length) { + // duplicate the buffer because its state will be modified + this.buffer = buffer.duplicate(); + this.startPosition = start; + this.buffer.position(start); + this.buffer.limit(start + length); + this.buffer.order(java.nio.ByteOrder.LITTLE_ENDIAN); + } + + SingleBufferInputStream(byte[] inBuf) { + this.buffer = ByteBuffer.wrap(inBuf); + this.buffer.order(java.nio.ByteOrder.LITTLE_ENDIAN); + this.startPosition = 0; + } + + SingleBufferInputStream(byte[] inBuf, int start, int length) { + // Without slice() here, TestByteBufferInputStreams.testSliceBuffersModification + // for TestSingleBufferInputStreamByteArrayConstructorOffsetLength + // has a failure because it dubiously assumes that the ByteBuffer's limits + // are the same as the backing array. + this.buffer = ByteBuffer.wrap(inBuf, start, length).slice(); + this.buffer.order(java.nio.ByteOrder.LITTLE_ENDIAN); + this.startPosition = 0; } @Override @@ -48,10 +75,7 @@ public long position() { @Override public int read() throws IOException { - if (!buffer.hasRemaining()) { - throw new EOFException(); - } - return buffer.get() & 0xFF; // as unsigned + return readUnsignedByte(); } @Override @@ -70,9 +94,22 @@ public int read(byte[] bytes, int offset, int length) throws IOException { return bytesToRead; } - + + @Override + public void readFully(byte[] bytes, int offset, int length) throws IOException { + try { + buffer.get(bytes, offset, length); + } catch (BufferUnderflowException | IndexOutOfBoundsException e) { + throw new EOFException(e.getMessage()); + } + } + @Override public long skip(long n) { + if (n < 0) { + throw new IllegalArgumentException("Invalid input for skip: " + n); + } + if (n == 0) { return 0; } @@ -88,6 +125,21 @@ public long skip(long n) { return bytesToSkip; } + @Override + public void skipFully(long n) throws IOException { + if (n < 0 || n > Integer.MAX_VALUE) { + throw new IllegalArgumentException("Invalid input for skipFully: " + n); + } + + try { + buffer.position(buffer.position() + (int)n); + } catch (IllegalArgumentException e) { + // Be sure to leave buffer in EOF state + buffer.position(buffer.limit()); + throw new EOFException(e.getMessage()); + } + } + @Override public int read(ByteBuffer out) { int bytesToCopy; @@ -116,10 +168,10 @@ public ByteBuffer slice(int length) throws EOFException { throw new EOFException(); } - // length is less than remaining, so it must fit in an int ByteBuffer copy = buffer.duplicate(); - copy.limit(copy.position() + length); - buffer.position(buffer.position() + length); + int en = buffer.position() + length; + copy.limit(en); + buffer.position(en); return copy; } @@ -150,6 +202,23 @@ public List remainingBuffers() { return Collections.singletonList(remaining); } + @Override + public ByteBufferInputStream remainingStream() { + // Constructor makes duplicate, so we don't have to explicitly make a duplicate here + ByteBufferInputStream remaining = new SingleBufferInputStream(buffer); + buffer.position(buffer.limit()); + return remaining; + } + + @Override + public ByteBufferInputStream sliceStream(long length) throws EOFException { + if (length > buffer.remaining()) throw new EOFException(); + + ByteBufferInputStream remaining = new SingleBufferInputStream(buffer, buffer.position(), (int)length); + buffer.position(buffer.position() + (int)length); + return remaining; + } + @Override public void mark(int readlimit) { this.mark = buffer.position(); @@ -174,4 +243,72 @@ public boolean markSupported() { public int available() { return buffer.remaining(); } + + /* + For all read methods, if we read off the end of the ByteBuffer, BufferUnderflowException is thrown, which + we catch and turn into an EOFException. This is measured to be faster than explicitly checking if the ByteBuffer + has any remaining bytes. + */ + @Override + public byte readByte() throws IOException { + try { + return buffer.get(); + } catch (BufferUnderflowException e) { + throw new EOFException(e.getMessage()); + } + } + + @Override + public int readUnsignedByte() throws IOException { + try { + return buffer.get() & 0xFF; + } catch (BufferUnderflowException e) { + throw new EOFException(e.getMessage()); + } + } + + /* + Use ByteBuffer.getShort(), which takes advantage of platform intrinsics + */ + @Override + public short readShort() throws IOException { + try { + return buffer.getShort(); + } catch (BufferUnderflowException e) { + throw new EOFException(e.getMessage()); + } + } + + @Override + public int readUnsignedShort() throws IOException { + try { + return buffer.getShort() & 0xFFFF; + } catch (BufferUnderflowException e) { + throw new EOFException(e.getMessage()); + } + } + + /* + Use ByteBuffer.getInt(), which takes advantage of platform intrinsics + */ + @Override + public int readInt() throws IOException { + try { + return buffer.getInt(); + } catch (BufferUnderflowException e) { + throw new EOFException(e.getMessage()); + } + } + + /* + Use ByteBuffer.getLong(), which takes advantage of platform intrinsics + */ + @Override + public long readLong() throws IOException { + try { + return buffer.getLong(); + } catch (BufferUnderflowException e) { + throw new EOFException(e.getMessage()); + } + } } diff --git a/parquet-common/src/test/java/org/apache/parquet/bytes/TestByteBufferInputStreams.java b/parquet-common/src/test/java/org/apache/parquet/bytes/TestByteBufferInputStreams.java index 0dc565f0f3..f4d5ea1a96 100644 --- a/parquet-common/src/test/java/org/apache/parquet/bytes/TestByteBufferInputStreams.java +++ b/parquet-common/src/test/java/org/apache/parquet/bytes/TestByteBufferInputStreams.java @@ -53,6 +53,17 @@ public void testRead0() throws Exception { 0, stream.read(bytes)); } + @Test + public void testSkip0() throws Exception { + ByteBufferInputStream stream = newStream(); + + long bytesRead = stream.skip(100); + Assert.assertTrue("Should skip to end of stream", bytesRead < 100); + + Assert.assertEquals("Should skip 0 bytes at end of stream", + 0, stream.skip(0)); + } + @Test public void testReadAll() throws Exception { byte[] bytes = new byte[35]; @@ -80,6 +91,31 @@ public void testReadAll() throws Exception { checkOriginalData(); } + @Test + public void testReadFully() throws Exception { + byte[] bytes = new byte[35]; + + ByteBufferInputStream stream = newStream(); + + stream.readFully(bytes); + + for (int i = 0; i < bytes.length; i += 1) { + Assert.assertEquals("Byte i should be i", i, bytes[i]); + Assert.assertEquals("Should advance position", 35, stream.position()); + } + + Assert.assertEquals("Should have no more remaining content", + 0, stream.available()); + + Assert.assertEquals("Should return -1 at end of stream", + -1, stream.read(bytes)); + + Assert.assertEquals("Should have no more remaining content", + 0, stream.available()); + + checkOriginalData(); + } + @Test public void testSmallReads() throws Exception { for (int size = 1; size < 36; size += 1) { @@ -181,6 +217,82 @@ public void testReadByte() throws Exception { checkOriginalData(); } + @Test + public void testReadShort() throws Exception { + final ByteBufferInputStream stream = newStream(); + int length = stream.available(); + int length2 = length & -2; // Only read a whole multiple of 2 bytes + int residual = length & 1; + + for (int i = 0; i < length2; i += 2) { + Assert.assertEquals("Position should increment", i, stream.position()); + short buffer_value = stream.readShort(); + short expected_value = (short)(i + ((i+1)<<8)); + Assert.assertEquals(expected_value, buffer_value); + } + + // Read the rest of the bytes + for (int i = 0; i < residual; i += 1) { + stream.read(); + } + + assertThrows("Should throw EOFException at end of stream", + EOFException.class, (Callable) stream::read); + + checkOriginalData(); + } + + @Test + public void testReadInt() throws Exception { + final ByteBufferInputStream stream = newStream(); + int length = stream.available(); + int length2 = length & -4; // Only read a whole multiple of 4 bytes + int residual = length & 3; + + for (int i = 0; i < length2; i += 4) { + Assert.assertEquals("Position should increment", i, stream.position()); + int buffer_value = stream.readInt(); + int expected_value = i + ((i+1)<<8) + ((i+2)<<16) + ((i+3)<<24); + Assert.assertEquals(expected_value, buffer_value); + } + + // Read the rest of the bytes + for (int i = 0; i < residual; i += 1) { + stream.read(); + } + + assertThrows("Should throw EOFException at end of stream", + EOFException.class, (Callable) stream::read); + + checkOriginalData(); + } + + @Test + public void testReadLong() throws Exception { + final ByteBufferInputStream stream = newStream(); + int length = stream.available(); + int length2 = length & -8; // Only read a whole multiple of 8 bytes + int residual = length & 7; + + for (long i = 0; i < length2; i += 8) { + Assert.assertEquals("Position should increment", i, stream.position()); + long buffer_value = stream.readLong(); + long expected_value = i + ((i+1)<<8) + ((i+2)<<16) + ((i+3)<<24) + + ((i+4)<<32) + ((i+5)<<40) + ((i+6)<<48) + ((i+7)<<56); + Assert.assertEquals(expected_value, buffer_value); + } + + // Read the rest of the bytes + for (int i = 0; i < residual; i += 1) { + stream.read(); + } + + assertThrows("Should throw EOFException at end of stream", + EOFException.class, (Callable) stream::read); + + checkOriginalData(); + } + @Test public void testSlice() throws Exception { ByteBufferInputStream stream = newStream(); diff --git a/parquet-common/src/test/java/org/apache/parquet/bytes/TestSingleBufferInputStreamByteArrayConstructor.java b/parquet-common/src/test/java/org/apache/parquet/bytes/TestSingleBufferInputStreamByteArrayConstructor.java new file mode 100644 index 0000000000..beaf7c908c --- /dev/null +++ b/parquet-common/src/test/java/org/apache/parquet/bytes/TestSingleBufferInputStreamByteArrayConstructor.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.bytes; + +public class TestSingleBufferInputStreamByteArrayConstructor extends TestByteBufferInputStreams { + static final byte[] DATA = new byte[] { + 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, + 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34 }; + + @Override + protected ByteBufferInputStream newStream() { + return new SingleBufferInputStream(DATA); + } + + @Override + protected void checkOriginalData() { + // Nothing to do + } +} diff --git a/parquet-common/src/test/java/org/apache/parquet/bytes/TestSingleBufferInputStreamByteArrayConstructorOffsetLength.java b/parquet-common/src/test/java/org/apache/parquet/bytes/TestSingleBufferInputStreamByteArrayConstructorOffsetLength.java new file mode 100644 index 0000000000..b7cf52fa37 --- /dev/null +++ b/parquet-common/src/test/java/org/apache/parquet/bytes/TestSingleBufferInputStreamByteArrayConstructorOffsetLength.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.bytes; + +public class TestSingleBufferInputStreamByteArrayConstructorOffsetLength extends TestByteBufferInputStreams { + static final byte[] DATA = new byte[] { + (byte) 0xff, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, + 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, (byte) 0xfe }; + + @Override + protected ByteBufferInputStream newStream() { + return new SingleBufferInputStream(DATA, 1, 35); + } + + @Override + protected void checkOriginalData() { + // Nothing to do + } +}