diff --git a/LICENSE b/LICENSE index 064892499911..7e567f0a1982 100644 --- a/LICENSE +++ b/LICENSE @@ -229,6 +229,7 @@ This product includes code from Apache Parquet. * DynConstructors.java * AssertHelpers.java * IOUtil.java readFully and tests +* ByteBufferInputStream implementations and tests Copyright: 2014-2017 The Apache Software Foundation. Home page: https://parquet.apache.org/ diff --git a/core/src/main/java/org/apache/iceberg/io/ByteBufferInputStream.java b/core/src/main/java/org/apache/iceberg/io/ByteBufferInputStream.java new file mode 100644 index 000000000000..aa3311a6c53d --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/ByteBufferInputStream.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; + +public abstract class ByteBufferInputStream extends SeekableInputStream { + + public static ByteBufferInputStream wrap(ByteBuffer... buffers) { + if (buffers.length == 1) { + return new SingleBufferInputStream(buffers[0]); + } else { + return new MultiBufferInputStream(Arrays.asList(buffers)); + } + } + + public static ByteBufferInputStream wrap(List buffers) { + if (buffers.size() == 1) { + return new SingleBufferInputStream(buffers.get(0)); + } else { + return new MultiBufferInputStream(buffers); + } + } + + public void skipFully(long length) throws IOException { + long skipped = skip(length); + if (skipped < length) { + throw new EOFException( + "Not enough bytes to skip: " + skipped + " < " + length); + } + } + + public abstract int read(ByteBuffer out); + + public abstract ByteBuffer slice(int length) throws EOFException; + + public abstract List sliceBuffers(long length) throws EOFException; + + public ByteBufferInputStream sliceStream(long length) throws EOFException { + return ByteBufferInputStream.wrap(sliceBuffers(length)); + } + + public abstract List remainingBuffers(); + + public ByteBufferInputStream remainingStream() { + return ByteBufferInputStream.wrap(remainingBuffers()); + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/MultiBufferInputStream.java b/core/src/main/java/org/apache/iceberg/io/MultiBufferInputStream.java new file mode 100644 index 000000000000..b85d50430535 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/MultiBufferInputStream.java @@ -0,0 +1,345 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import org.apache.iceberg.relocated.com.google.common.collect.Iterators; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +class MultiBufferInputStream extends ByteBufferInputStream { + private static final ByteBuffer EMPTY = ByteBuffer.allocate(0); + + private final List buffers; + private final long length; + + private Iterator iterator; + private ByteBuffer current = EMPTY; + private long position; + + private long mark; + private long markLimit; + private List markBuffers; + + MultiBufferInputStream(List buffers) { + this.buffers = buffers; + + long totalLen = 0; + for (ByteBuffer buffer : buffers) { + totalLen += buffer.remaining(); + } + this.length = totalLen; + + initFromBuffers(); + } + + private void initFromBuffers() { + discardMark(); + this.position = 0; + this.iterator = buffers.stream().map(ByteBuffer::duplicate).iterator(); + nextBuffer(); + } + + @Override + public long getPos() { + return position; + } + + public void seek(long newPosition) throws IOException { + if (newPosition > length) { + throw new EOFException(String.format("Cannot seek to position after end of file: %s", newPosition)); + } + + if (position > newPosition) { + // backward seek requires returning to the initial state + initFromBuffers(); + } + + long bytesToSkip = newPosition - position; + skipFully(bytesToSkip); + } + + @Override + public long skip(long n) { + if (n <= 0) { + return 0; + } + + if (current == null) { + return -1; + } + + long bytesSkipped = 0; + while (bytesSkipped < n) { + if (current.remaining() > 0) { + long bytesToSkip = Math.min(n - bytesSkipped, current.remaining()); + current.position(current.position() + (int) bytesToSkip); + bytesSkipped += bytesToSkip; + this.position += bytesToSkip; + } else if (!nextBuffer()) { + // there are no more buffers + return bytesSkipped > 0 ? bytesSkipped : -1; + } + } + + return bytesSkipped; + } + + @Override + public int read(ByteBuffer out) { + int len = out.remaining(); + if (len <= 0) { + return 0; + } + + if (current == null) { + return -1; + } + + int bytesCopied = 0; + while (bytesCopied < len) { + if (current.remaining() > 0) { + int bytesToCopy; + ByteBuffer copyBuffer; + if (current.remaining() <= out.remaining()) { + // copy all of the current buffer + bytesToCopy = current.remaining(); + copyBuffer = current; + } else { + // copy a slice of the current buffer + bytesToCopy = out.remaining(); + copyBuffer = current.duplicate(); + copyBuffer.limit(copyBuffer.position() + bytesToCopy); + current.position(copyBuffer.position() + bytesToCopy); + } + + out.put(copyBuffer); + bytesCopied += bytesToCopy; + this.position += bytesToCopy; + + } else if (!nextBuffer()) { + // there are no more buffers + return bytesCopied > 0 ? bytesCopied : -1; + } + } + + return bytesCopied; + } + + @Override + public ByteBuffer slice(int len) throws EOFException { + if (len <= 0) { + return EMPTY; + } + + if (current == null) { + throw new EOFException(); + } + + ByteBuffer slice; + if (len > current.remaining()) { + // a copy is needed to return a single buffer + slice = ByteBuffer.allocate(len); + int bytesCopied = read(slice); + slice.flip(); + if (bytesCopied < len) { + throw new EOFException(); + } + } else { + slice = current.duplicate(); + slice.limit(slice.position() + len); + current.position(slice.position() + len); + this.position += len; + } + + return slice; + } + + public List sliceBuffers(long len) throws EOFException { + if (len <= 0) { + return Collections.emptyList(); + } + + if (current == null) { + throw new EOFException(); + } + + List sliceBuffers = Lists.newArrayList(); + long bytesAccumulated = 0; + while (bytesAccumulated < len) { + if (current.remaining() > 0) { + // get a slice of the current buffer to return + // always fits in an int because remaining returns an int that is >= 0 + int bufLen = (int) Math.min(len - bytesAccumulated, current.remaining()); + ByteBuffer slice = current.duplicate(); + slice.limit(slice.position() + bufLen); + sliceBuffers.add(slice); + bytesAccumulated += bufLen; + + // update state; the bytes are considered read + current.position(current.position() + bufLen); + this.position += bufLen; + } else if (!nextBuffer()) { + // there are no more buffers + throw new EOFException(); + } + } + + return sliceBuffers; + } + + @Override + public List remainingBuffers() { + if (position >= length) { + return Collections.emptyList(); + } + + try { + return sliceBuffers(length - position); + } catch (EOFException e) { + throw new RuntimeException( + "[Parquet bug] Stream is bad: incorrect bytes remaining " + + (length - position)); + } + } + + @Override + public int read(byte[] bytes, int off, int len) { + if (len <= 0) { + if (len < 0) { + throw new IndexOutOfBoundsException("Read length must be greater than 0: " + len); + } + return 0; + } + + if (current == null) { + return -1; + } + + int bytesRead = 0; + while (bytesRead < len) { + if (current.remaining() > 0) { + int bytesToRead = Math.min(len - bytesRead, current.remaining()); + current.get(bytes, off + bytesRead, bytesToRead); + bytesRead += bytesToRead; + this.position += bytesToRead; + } else if (!nextBuffer()) { + // there are no more buffers + return bytesRead > 0 ? bytesRead : -1; + } + } + + return bytesRead; + } + + @Override + public int read(byte[] bytes) { + return read(bytes, 0, bytes.length); + } + + @Override + public int read() throws IOException { + if (current == null) { + throw new EOFException(); + } + + while (true) { + if (current.remaining() > 0) { + this.position += 1; + return current.get() & 0xFF; // as unsigned + } else if (!nextBuffer()) { + // there are no more buffers + throw new EOFException(); + } + } + } + + @Override + public int available() { + long remaining = length - position; + if (remaining > Integer.MAX_VALUE) { + return Integer.MAX_VALUE; + } else { + return (int) remaining; + } + } + + @Override + public void mark(int readlimit) { + if (mark >= 0) { + discardMark(); + } + this.mark = position; + this.markLimit = mark + readlimit + 1; + if (current != null) { + markBuffers.add(current.duplicate()); + } + } + + @Override + public void reset() throws IOException { + if (mark >= 0 && position < markLimit) { + this.position = mark; + // replace the current iterator with one that adds back the buffers that + // have been used since mark was called. + this.iterator = Iterators.concat(markBuffers.iterator(), iterator); + discardMark(); + nextBuffer(); // go back to the marked buffers + } else { + throw new IOException("No mark defined or has read past the previous mark limit"); + } + } + + private void discardMark() { + this.mark = -1; + this.markLimit = 0; + markBuffers = Lists.newArrayList(); + } + + @Override + public boolean markSupported() { + return true; + } + + private boolean nextBuffer() { + if (!iterator.hasNext()) { + this.current = null; + return false; + } + + this.current = iterator.next().duplicate(); + + if (mark >= 0) { + if (position < markLimit) { + // the mark is defined and valid. save the new buffer + markBuffers.add(current.duplicate()); + } else { + // the mark has not been used and is no longer valid + discardMark(); + } + } + + return true; + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/SingleBufferInputStream.java b/core/src/main/java/org/apache/iceberg/io/SingleBufferInputStream.java new file mode 100644 index 000000000000..88c5b79a24ea --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/SingleBufferInputStream.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; + +/** + * This ByteBufferInputStream does not consume the ByteBuffer being passed in, + * but will create a slice of the current buffer. + */ +class SingleBufferInputStream extends ByteBufferInputStream { + + private final ByteBuffer original; + private final long startPosition; + private final int length; + private ByteBuffer buffer; + private int mark; + + SingleBufferInputStream(ByteBuffer buffer) { + // duplicate the buffer because its state will be modified + this.original = buffer; + this.startPosition = buffer.position(); + this.length = original.remaining(); + initFromBuffer(); + } + + private void initFromBuffer() { + this.mark = -1; + this.buffer = original.duplicate(); + } + + @Override + public long getPos() { + // position is relative to the start of the stream, not the buffer + return buffer.position() - startPosition; + } + + @Override + public int read() throws IOException { + if (!buffer.hasRemaining()) { + throw new EOFException(); + } + return buffer.get() & 0xFF; // as unsigned + } + + @Override + public int read(byte[] bytes, int offset, int len) throws IOException { + if (len == 0) { + return 0; + } + + int remaining = buffer.remaining(); + if (remaining <= 0) { + return -1; + } + + int bytesToRead = Math.min(buffer.remaining(), len); + buffer.get(bytes, offset, bytesToRead); + + return bytesToRead; + } + + @Override + public void seek(long newPosition) throws IOException { + if (newPosition > length) { + throw new EOFException(String.format("Cannot seek to position after end of file: %s", newPosition)); + } + + if (getPos() > newPosition) { + // backwards seek requires returning to the initial state + initFromBuffer(); + } + + long bytesToSkip = newPosition - getPos(); + skipFully(bytesToSkip); + } + + @Override + public long skip(long len) { + if (len == 0) { + return 0; + } + + if (buffer.remaining() <= 0) { + return -1; + } + + // buffer.remaining is an int, so this will always fit in an int + int bytesToSkip = (int) Math.min(buffer.remaining(), len); + buffer.position(buffer.position() + bytesToSkip); + + return bytesToSkip; + } + + @Override + public int read(ByteBuffer out) { + int bytesToCopy; + ByteBuffer copyBuffer; + if (buffer.remaining() <= out.remaining()) { + // copy all of the buffer + bytesToCopy = buffer.remaining(); + copyBuffer = buffer; + } else { + // copy a slice of the current buffer + bytesToCopy = out.remaining(); + copyBuffer = buffer.duplicate(); + copyBuffer.limit(buffer.position() + bytesToCopy); + buffer.position(buffer.position() + bytesToCopy); + } + + out.put(copyBuffer); + out.flip(); + + return bytesToCopy; + } + + @Override + public ByteBuffer slice(int len) throws EOFException { + if (buffer.remaining() < len) { + throw new EOFException(); + } + + // length is less than remaining, so it must fit in an int + ByteBuffer copy = buffer.duplicate(); + copy.limit(copy.position() + len); + buffer.position(buffer.position() + len); + + return copy; + } + + @Override + public List sliceBuffers(long len) throws EOFException { + if (len == 0) { + return Collections.emptyList(); + } + + if (len > buffer.remaining()) { + throw new EOFException(); + } + + // length is less than remaining, so it must fit in an int + return Collections.singletonList(slice((int) len)); + } + + @Override + public List remainingBuffers() { + if (buffer.remaining() <= 0) { + return Collections.emptyList(); + } + + ByteBuffer remaining = buffer.duplicate(); + buffer.position(buffer.limit()); + + return Collections.singletonList(remaining); + } + + @Override + public void mark(int readlimit) { + this.mark = buffer.position(); + } + + @Override + public void reset() throws IOException { + if (mark >= 0) { + buffer.position(mark); + this.mark = -1; + } else { + throw new IOException("No mark defined"); + } + } + + @Override + public boolean markSupported() { + return true; + } + + @Override + public int available() { + return buffer.remaining(); + } +} diff --git a/core/src/test/java/org/apache/iceberg/io/TestByteBufferInputStreams.java b/core/src/test/java/org/apache/iceberg/io/TestByteBufferInputStreams.java new file mode 100644 index 000000000000..3bab66b81575 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/io/TestByteBufferInputStreams.java @@ -0,0 +1,555 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Callable; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.junit.Assert; +import org.junit.Test; + +public abstract class TestByteBufferInputStreams { + + protected abstract ByteBufferInputStream newStream(); + protected abstract void checkOriginalData(); + + @Test + public void testRead0() throws Exception { + byte[] bytes = new byte[0]; + + ByteBufferInputStream stream = newStream(); + + Assert.assertEquals("Should read 0 bytes", 0, stream.read(bytes)); + + int bytesRead = stream.read(new byte[100]); + Assert.assertTrue("Should read to end of stream", bytesRead < 100); + + Assert.assertEquals("Should read 0 bytes at end of stream", + 0, stream.read(bytes)); + } + + @Test + public void testReadAll() throws Exception { + byte[] bytes = new byte[35]; + + ByteBufferInputStream stream = newStream(); + + int bytesRead = stream.read(bytes); + Assert.assertEquals("Should read the entire buffer", + bytes.length, bytesRead); + + for (int i = 0; i < bytes.length; i += 1) { + Assert.assertEquals("Byte i should be i", i, bytes[i]); + Assert.assertEquals("Should advance position", 35, stream.getPos()); + } + + Assert.assertEquals("Should have no more remaining content", + 0, stream.available()); + + Assert.assertEquals("Should return -1 at end of stream", + -1, stream.read(bytes)); + + Assert.assertEquals("Should have no more remaining content", + 0, stream.available()); + + checkOriginalData(); + } + + @Test + public void testSmallReads() throws Exception { + for (int size = 1; size < 36; size += 1) { + byte[] bytes = new byte[size]; + + ByteBufferInputStream stream = newStream(); + long length = stream.available(); + + int lastBytesRead = bytes.length; + for (int offset = 0; offset < length; offset += bytes.length) { + Assert.assertEquals("Should read requested len", + bytes.length, lastBytesRead); + + lastBytesRead = stream.read(bytes, 0, bytes.length); + + Assert.assertEquals("Should advance position", + offset + lastBytesRead, stream.getPos()); + + // validate the bytes that were read + for (int i = 0; i < lastBytesRead; i += 1) { + Assert.assertEquals("Byte i should be i", offset + i, bytes[i]); + } + } + + Assert.assertEquals("Should read fewer bytes at end of buffer", + length % bytes.length, lastBytesRead % bytes.length); + + Assert.assertEquals("Should have no more remaining content", + 0, stream.available()); + + Assert.assertEquals("Should return -1 at end of stream", + -1, stream.read(bytes)); + + Assert.assertEquals("Should have no more remaining content", + 0, stream.available()); + } + + checkOriginalData(); + } + + @Test + public void testPartialBufferReads() throws Exception { + for (int size = 1; size < 35; size += 1) { + byte[] bytes = new byte[33]; + + ByteBufferInputStream stream = newStream(); + + int lastBytesRead = size; + for (int offset = 0; offset < bytes.length; offset += size) { + Assert.assertEquals("Should read requested len", size, lastBytesRead); + + lastBytesRead = stream.read( + bytes, offset, Math.min(size, bytes.length - offset)); + + Assert.assertEquals("Should advance position", + lastBytesRead > 0 ? offset + lastBytesRead : offset, + stream.getPos()); + } + + Assert.assertEquals("Should read fewer bytes at end of buffer", + bytes.length % size, lastBytesRead % size); + + for (int i = 0; i < bytes.length; i += 1) { + Assert.assertEquals("Byte i should be i", i, bytes[i]); + } + + Assert.assertEquals("Should have no more remaining content", + 2, stream.available()); + + Assert.assertEquals("Should return 2 more bytes", + 2, stream.read(bytes)); + + Assert.assertEquals("Should have no more remaining content", + 0, stream.available()); + + Assert.assertEquals("Should return -1 at end of stream", + -1, stream.read(bytes)); + + Assert.assertEquals("Should have no more remaining content", + 0, stream.available()); + } + + checkOriginalData(); + } + + @Test + public void testReadByte() throws Exception { + ByteBufferInputStream stream = newStream(); + int length = stream.available(); + + for (int i = 0; i < length; i += 1) { + Assert.assertEquals("Position should increment", i, stream.getPos()); + Assert.assertEquals(i, stream.read()); + } + + AssertHelpers.assertThrows("Should throw EOFException at end of stream", + EOFException.class, (Callable) stream::read); + + checkOriginalData(); + } + + @Test + @SuppressWarnings("LocalVariableName") + public void testSlice() throws Exception { + ByteBufferInputStream stream = newStream(); + int length = stream.available(); + + ByteBuffer empty = stream.slice(0); + Assert.assertNotNull("slice(0) should produce a non-null buffer", empty); + Assert.assertEquals("slice(0) should produce an empty buffer", + 0, empty.remaining()); + + Assert.assertEquals("Position should be at start", 0, stream.getPos()); + + int i = 0; + while (stream.available() > 0) { + int bytesToSlice = Math.min(stream.available(), 10); + ByteBuffer buffer = stream.slice(bytesToSlice); + + for (int j = 0; j < bytesToSlice; j += 1) { + Assert.assertEquals("Data should be correct", i + j, buffer.get()); + } + + i += bytesToSlice; + } + + Assert.assertEquals("Position should be at end", length, stream.getPos()); + + checkOriginalData(); + } + + @Test + public void testSliceBuffers0() throws Exception { + ByteBufferInputStream stream = newStream(); + + Assert.assertEquals("Should return an empty list", + Collections.emptyList(), stream.sliceBuffers(0)); + } + + @Test + public void testWholeSliceBuffers() throws Exception { + ByteBufferInputStream stream = newStream(); + int length = stream.available(); + + List buffers = stream.sliceBuffers(stream.available()); + + Assert.assertEquals("Should consume all buffers", length, stream.getPos()); + + AssertHelpers.assertThrows("Should throw EOFException when empty", + EOFException.class, () -> stream.sliceBuffers(length)); + + ByteBufferInputStream copy = ByteBufferInputStream.wrap(buffers); + for (int i = 0; i < length; i += 1) { + Assert.assertEquals("Slice should have identical data", i, copy.read()); + } + + checkOriginalData(); + } + + @Test + public void testSliceBuffersCoverage() throws Exception { + for (int size = 1; size < 36; size += 1) { + ByteBufferInputStream stream = newStream(); + int length = stream.available(); + + List buffers = Lists.newArrayList(); + while (stream.available() > 0) { + buffers.addAll(stream.sliceBuffers(Math.min(size, stream.available()))); + } + + Assert.assertEquals("Should consume all content", + length, stream.getPos()); + + ByteBufferInputStream newStream = new MultiBufferInputStream(buffers); + + for (int i = 0; i < length; i += 1) { + Assert.assertEquals("Data should be correct", i, newStream.read()); + } + } + + checkOriginalData(); + } + + @Test + public void testSliceBuffersModification() throws Exception { + ByteBufferInputStream stream = newStream(); + int length = stream.available(); + + int sliceLength = 5; + List buffers = stream.sliceBuffers(sliceLength); + Assert.assertEquals("Should advance the original stream", + length - sliceLength, stream.available()); + Assert.assertEquals("Should advance the original stream position", + sliceLength, stream.getPos()); + + Assert.assertEquals("Should return a slice of the first buffer", + 1, buffers.size()); + + ByteBuffer buffer = buffers.get(0); + Assert.assertEquals("Should have requested bytes", + sliceLength, buffer.remaining()); + + // read the buffer one past the returned limit. this should not change the + // next value in the original stream + buffer.limit(sliceLength + 1); + for (int i = 0; i < sliceLength + 1; i += 1) { + Assert.assertEquals("Should have correct data", i, buffer.get()); + } + + Assert.assertEquals("Reading a slice shouldn't advance the original stream", + sliceLength, stream.getPos()); + Assert.assertEquals("Reading a slice shouldn't change the underlying data", + sliceLength, stream.read()); + + // change the underlying data buffer + buffer.limit(sliceLength + 2); + int originalValue = buffer.duplicate().get(); + ByteBuffer undoBuffer = buffer.duplicate(); + + try { + buffer.put((byte) 255); + + Assert.assertEquals( + "Writing to a slice shouldn't advance the original stream", + sliceLength + 1, stream.getPos()); + Assert.assertEquals( + "Writing to a slice should change the underlying data", + 255, stream.read()); + + } finally { + undoBuffer.put((byte) originalValue); + } + } + + @Test + public void testSkip() throws Exception { + ByteBufferInputStream stream = newStream(); + + while (stream.available() > 0) { + int bytesToSkip = Math.min(stream.available(), 10); + Assert.assertEquals("Should skip all, regardless of backing buffers", + bytesToSkip, stream.skip(bytesToSkip)); + } + + stream = newStream(); + Assert.assertEquals(0, stream.skip(0)); + + int length = stream.available(); + Assert.assertEquals("Should stop at end when out of bytes", + length, stream.skip(length + 10)); + Assert.assertEquals("Should return -1 when at end", + -1, stream.skip(10)); + } + + @Test + public void testSkipFully() throws Exception { + ByteBufferInputStream stream = newStream(); + + long lastPosition = 0; + while (stream.available() > 0) { + int bytesToSkip = Math.min(stream.available(), 10); + + stream.skipFully(bytesToSkip); + + Assert.assertEquals("Should skip all, regardless of backing buffers", + bytesToSkip, stream.getPos() - lastPosition); + + lastPosition = stream.getPos(); + } + + ByteBufferInputStream stream2 = newStream(); + stream2.skipFully(0); + Assert.assertEquals(0, stream2.getPos()); + + int length = stream2.available(); + AssertHelpers.assertThrows("Should throw when out of bytes", + EOFException.class, () -> { + stream2.skipFully(length + 10); + return null; + }); + } + + @Test + public void testMark() throws Exception { + ByteBufferInputStream stream = newStream(); + + stream.read(new byte[7]); + stream.mark(100); + + long mark = stream.getPos(); + + byte[] expected = new byte[100]; + int expectedBytesRead = stream.read(expected); + + long end = stream.getPos(); + + stream.reset(); + + Assert.assertEquals("Position should return to the mark", + mark, stream.getPos()); + + byte[] afterReset = new byte[100]; + int bytesReadAfterReset = stream.read(afterReset); + + Assert.assertEquals("Should read the same number of bytes", + expectedBytesRead, bytesReadAfterReset); + + Assert.assertEquals("Read should end at the same position", + end, stream.getPos()); + + Assert.assertArrayEquals("Content should be equal", expected, afterReset); + } + + @Test + public void testMarkTwice() throws Exception { + ByteBufferInputStream stream = newStream(); + + stream.read(new byte[7]); + stream.mark(1); + stream.mark(100); + + long mark = stream.getPos(); + + byte[] expected = new byte[100]; + int expectedBytesRead = stream.read(expected); + + long end = stream.getPos(); + + stream.reset(); + + Assert.assertEquals("Position should return to the mark", + mark, stream.getPos()); + + byte[] afterReset = new byte[100]; + int bytesReadAfterReset = stream.read(afterReset); + + Assert.assertEquals("Should read the same number of bytes", + expectedBytesRead, bytesReadAfterReset); + + Assert.assertEquals("Read should end at the same position", + end, stream.getPos()); + + Assert.assertArrayEquals("Content should be equal", expected, afterReset); + } + + @Test + public void testMarkAtStart() throws Exception { + ByteBufferInputStream stream = newStream(); + + stream.mark(100); + + long mark = stream.getPos(); + + byte[] expected = new byte[10]; + Assert.assertEquals("Should read 10 bytes", 10, stream.read(expected)); + + long end = stream.getPos(); + + stream.reset(); + + Assert.assertEquals("Position should return to the mark", + mark, stream.getPos()); + + byte[] afterReset = new byte[10]; + Assert.assertEquals("Should read 10 bytes", 10, stream.read(afterReset)); + + Assert.assertEquals("Read should end at the same position", + end, stream.getPos()); + + Assert.assertArrayEquals("Content should be equal", expected, afterReset); + } + + @Test + public void testMarkAtEnd() throws Exception { + ByteBufferInputStream stream = newStream(); + + int bytesRead = stream.read(new byte[100]); + Assert.assertTrue("Should read to end of stream", bytesRead < 100); + + stream.mark(100); + + long mark = stream.getPos(); + + byte[] expected = new byte[10]; + Assert.assertEquals("Should read 0 bytes", -1, stream.read(expected)); + + long end = stream.getPos(); + + stream.reset(); + + Assert.assertEquals("Position should return to the mark", + mark, stream.getPos()); + + byte[] afterReset = new byte[10]; + Assert.assertEquals("Should read 0 bytes", -1, stream.read(afterReset)); + + Assert.assertEquals("Read should end at the same position", + end, stream.getPos()); + + Assert.assertArrayEquals("Content should be equal", expected, afterReset); + } + + @Test + public void testMarkUnset() { + ByteBufferInputStream stream = newStream(); + + AssertHelpers.assertThrows("Should throw an error for reset() without mark()", + IOException.class, () -> { + stream.reset(); + return null; + }); + } + + @Test + public void testMarkAndResetTwiceOverSameRange() throws Exception { + ByteBufferInputStream stream = newStream(); + + byte[] expected = new byte[6]; + stream.mark(10); + Assert.assertEquals("Should read expected bytes", + expected.length, stream.read(expected)); + + stream.reset(); + stream.mark(10); + + byte[] firstRead = new byte[6]; + Assert.assertEquals("Should read firstRead bytes", + firstRead.length, stream.read(firstRead)); + + stream.reset(); + + byte[] secondRead = new byte[6]; + Assert.assertEquals("Should read secondRead bytes", + secondRead.length, stream.read(secondRead)); + + Assert.assertArrayEquals("First read should be correct", + expected, firstRead); + + Assert.assertArrayEquals("Second read should be correct", + expected, secondRead); + } + + @Test + public void testMarkLimit() throws Exception { + ByteBufferInputStream stream = newStream(); + + stream.mark(5); + Assert.assertEquals("Should read 5 bytes", 5, stream.read(new byte[5])); + + stream.reset(); + + Assert.assertEquals("Should read 6 bytes", 6, stream.read(new byte[6])); + + AssertHelpers.assertThrows("Should throw an error for reset() after limit", + IOException.class, () -> { + stream.reset(); + return null; + }); + } + + @Test + public void testMarkDoubleReset() throws Exception { + ByteBufferInputStream stream = newStream(); + + stream.mark(5); + Assert.assertEquals("Should read 5 bytes", 5, stream.read(new byte[5])); + + stream.reset(); + + AssertHelpers.assertThrows("Should throw an error for double reset()", + IOException.class, () -> { + stream.reset(); + return null; + }); + } +} diff --git a/core/src/test/java/org/apache/iceberg/io/TestMultiBufferInputStream.java b/core/src/test/java/org/apache/iceberg/io/TestMultiBufferInputStream.java new file mode 100644 index 000000000000..eca929e040df --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/io/TestMultiBufferInputStream.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.junit.Assert; +import org.junit.Test; + +public class TestMultiBufferInputStream extends TestByteBufferInputStreams { + private static final List DATA = Arrays.asList( + ByteBuffer.wrap(new byte[] { 0, 1, 2, 3, 4, 5, 6, 7, 8 }), + ByteBuffer.wrap(new byte[] { 9, 10, 11, 12 }), + ByteBuffer.wrap(new byte[] { }), + ByteBuffer.wrap(new byte[] { 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24 }), + ByteBuffer.wrap(new byte[] { 25 }), + ByteBuffer.wrap(new byte[] { 26, 27, 28, 29, 30, 31, 32 }), + ByteBuffer.wrap(new byte[] { 33, 34 }) + ); + + @Override + protected ByteBufferInputStream newStream() { + return new MultiBufferInputStream(DATA); + } + + @Override + protected void checkOriginalData() { + for (ByteBuffer buffer : DATA) { + Assert.assertEquals("Position should not change", 0, buffer.position()); + Assert.assertEquals("Limit should not change", + buffer.array().length, buffer.limit()); + } + } + + @Test + @SuppressWarnings("LocalVariableName") + public void testSliceData() throws Exception { + ByteBufferInputStream stream = newStream(); + int length = stream.available(); + + List buffers = Lists.newArrayList(); + // slice the stream into 3 8-byte buffers and 1 2-byte buffer + while (stream.available() > 0) { + int bytesToSlice = Math.min(stream.available(), 8); + buffers.add(stream.slice(bytesToSlice)); + } + + Assert.assertEquals("Position should be at end", length, stream.getPos()); + Assert.assertEquals("Should produce 5 buffers", 5, buffers.size()); + + int i = 0; + + // one is a view of the first buffer because it is smaller + ByteBuffer one = buffers.get(0); + Assert.assertSame("Should be a duplicate of the first array", + one.array(), DATA.get(0).array()); + Assert.assertEquals(8, one.remaining()); + Assert.assertEquals(0, one.position()); + Assert.assertEquals(8, one.limit()); + Assert.assertEquals(9, one.capacity()); + for (; i < 8; i += 1) { + Assert.assertEquals("Should produce correct values", i, one.get()); + } + + // two should be a copy of the next 8 bytes + ByteBuffer two = buffers.get(1); + Assert.assertEquals(8, two.remaining()); + Assert.assertEquals(0, two.position()); + Assert.assertEquals(8, two.limit()); + Assert.assertEquals(8, two.capacity()); + for (; i < 16; i += 1) { + Assert.assertEquals("Should produce correct values", i, two.get()); + } + + // three is a copy of part of the 4th buffer + ByteBuffer three = buffers.get(2); + Assert.assertSame("Should be a duplicate of the fourth array", + three.array(), DATA.get(3).array()); + Assert.assertEquals(8, three.remaining()); + Assert.assertEquals(3, three.position()); + Assert.assertEquals(11, three.limit()); + Assert.assertEquals(12, three.capacity()); + for (; i < 24; i += 1) { + Assert.assertEquals("Should produce correct values", i, three.get()); + } + + // four should be a copy of the next 8 bytes + ByteBuffer four = buffers.get(3); + Assert.assertEquals(8, four.remaining()); + Assert.assertEquals(0, four.position()); + Assert.assertEquals(8, four.limit()); + Assert.assertEquals(8, four.capacity()); + for (; i < 32; i += 1) { + Assert.assertEquals("Should produce correct values", i, four.get()); + } + + // five should be a copy of the next 8 bytes + ByteBuffer five = buffers.get(4); + Assert.assertEquals(3, five.remaining()); + Assert.assertEquals(0, five.position()); + Assert.assertEquals(3, five.limit()); + Assert.assertEquals(3, five.capacity()); + for (; i < 35; i += 1) { + Assert.assertEquals("Should produce correct values", i, five.get()); + } + } + + @Test + public void testSliceBuffersData() throws Exception { + ByteBufferInputStream stream = newStream(); + + List buffers = stream.sliceBuffers(stream.available()); + List nonEmptyBuffers = Lists.newArrayList(); + for (ByteBuffer buffer : DATA) { + if (buffer.remaining() > 0) { + nonEmptyBuffers.add(buffer); + } + } + + Assert.assertEquals("Should return duplicates of all non-empty buffers", + nonEmptyBuffers, buffers); + } +} diff --git a/core/src/test/java/org/apache/iceberg/io/TestSingleBufferInputStream.java b/core/src/test/java/org/apache/iceberg/io/TestSingleBufferInputStream.java new file mode 100644 index 000000000000..512eaf2afcd4 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/io/TestSingleBufferInputStream.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.junit.Assert; +import org.junit.Test; + +public class TestSingleBufferInputStream extends TestByteBufferInputStreams { + private static final ByteBuffer DATA = ByteBuffer.wrap(new byte[] { + 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, + 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34 }); + + @Override + protected ByteBufferInputStream newStream() { + return new SingleBufferInputStream(DATA); + } + + @Override + protected void checkOriginalData() { + Assert.assertEquals("Position should not change", 0, DATA.position()); + Assert.assertEquals("Limit should not change", + DATA.array().length, DATA.limit()); + } + + @Test + @SuppressWarnings("LocalVariableName") + public void testSliceData() throws Exception { + ByteBufferInputStream stream = newStream(); + int length = stream.available(); + + List buffers = Lists.newArrayList(); + // slice the stream into 3 8-byte buffers and 1 2-byte buffer + while (stream.available() > 0) { + int bytesToSlice = Math.min(stream.available(), 8); + buffers.add(stream.slice(bytesToSlice)); + } + + Assert.assertEquals("Position should be at end", length, stream.getPos()); + Assert.assertEquals("Should produce 5 buffers", 5, buffers.size()); + + int i = 0; + + ByteBuffer one = buffers.get(0); + Assert.assertSame("Should use the same backing array", + one.array(), DATA.array()); + Assert.assertEquals(8, one.remaining()); + Assert.assertEquals(0, one.position()); + Assert.assertEquals(8, one.limit()); + Assert.assertEquals(35, one.capacity()); + for (; i < 8; i += 1) { + Assert.assertEquals("Should produce correct values", i, one.get()); + } + + ByteBuffer two = buffers.get(1); + Assert.assertSame("Should use the same backing array", + two.array(), DATA.array()); + Assert.assertEquals(8, two.remaining()); + Assert.assertEquals(8, two.position()); + Assert.assertEquals(16, two.limit()); + Assert.assertEquals(35, two.capacity()); + for (; i < 16; i += 1) { + Assert.assertEquals("Should produce correct values", i, two.get()); + } + + // three is a copy of part of the 4th buffer + ByteBuffer three = buffers.get(2); + Assert.assertSame("Should use the same backing array", + three.array(), DATA.array()); + Assert.assertEquals(8, three.remaining()); + Assert.assertEquals(16, three.position()); + Assert.assertEquals(24, three.limit()); + Assert.assertEquals(35, three.capacity()); + for (; i < 24; i += 1) { + Assert.assertEquals("Should produce correct values", i, three.get()); + } + + // four should be a copy of the next 8 bytes + ByteBuffer four = buffers.get(3); + Assert.assertSame("Should use the same backing array", + four.array(), DATA.array()); + Assert.assertEquals(8, four.remaining()); + Assert.assertEquals(24, four.position()); + Assert.assertEquals(32, four.limit()); + Assert.assertEquals(35, four.capacity()); + for (; i < 32; i += 1) { + Assert.assertEquals("Should produce correct values", i, four.get()); + } + + // five should be a copy of the next 8 bytes + ByteBuffer five = buffers.get(4); + Assert.assertSame("Should use the same backing array", + five.array(), DATA.array()); + Assert.assertEquals(3, five.remaining()); + Assert.assertEquals(32, five.position()); + Assert.assertEquals(35, five.limit()); + Assert.assertEquals(35, five.capacity()); + for (; i < 35; i += 1) { + Assert.assertEquals("Should produce correct values", i, five.get()); + } + } + + @Test + public void testWholeSliceBuffersData() throws Exception { + ByteBufferInputStream stream = newStream(); + + List buffers = stream.sliceBuffers(stream.available()); + Assert.assertEquals("Should return duplicates of all non-empty buffers", + Collections.singletonList(DATA), buffers); + } +}