diff --git a/LICENSE b/LICENSE index 82a12b78272b..064892499911 100644 --- a/LICENSE +++ b/LICENSE @@ -228,6 +228,7 @@ This product includes code from Apache Parquet. * DynMethods.java * DynConstructors.java * AssertHelpers.java +* IOUtil.java readFully and tests Copyright: 2014-2017 The Apache Software Foundation. Home page: https://parquet.apache.org/ diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java index b62b49f34676..92d0f3f3eae9 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java @@ -128,7 +128,7 @@ public int readTail(byte[] buffer, int offset, int length) throws IOException { String range = String.format("bytes=-%s", length); - return IOUtil.readToEnd(readRange(range), buffer, offset, length); + return IOUtil.readRemaining(readRange(range), buffer, offset, length); } private InputStream readRange(String range) { diff --git a/core/src/main/java/org/apache/iceberg/io/IOUtil.java b/core/src/main/java/org/apache/iceberg/io/IOUtil.java index d60f6983cdd9..0daf4a019358 100644 --- a/core/src/main/java/org/apache/iceberg/io/IOUtil.java +++ b/core/src/main/java/org/apache/iceberg/io/IOUtil.java @@ -19,6 +19,7 @@ package org.apache.iceberg.io; +import java.io.EOFException; import java.io.IOException; import java.io.InputStream; @@ -34,12 +35,14 @@ private IOUtil() { * @param bytes a buffer to write into * @param offset starting offset in the buffer for the data * @param length length of bytes to copy from the input stream to the buffer + * @throws EOFException if the end of the stream is reached before reading length bytes * @throws IOException if there is an error while reading */ - @SuppressWarnings("checkstyle:InnerAssignment") public static void readFully(InputStream stream, byte[] bytes, int offset, int length) throws IOException { - if (readToEnd(stream, bytes, offset, length) != length) { - throw new IOException("End of stream reached before completing read"); + int bytesRead = readRemaining(stream, bytes, offset, length); + if (bytesRead < length) { + throw new EOFException( + "Reached the end of stream with " + (length - bytesRead) + " bytes left to read"); } } @@ -51,17 +54,22 @@ public static void readFully(InputStream stream, byte[] bytes, int offset, int l * @param bytes a buffer to write into * @param offset starting offset in the buffer for the data * @param length length of bytes to copy from the input stream to the buffer + * @return the number of bytes read * @throws IOException if there is an error while reading */ - @SuppressWarnings("checkstyle:InnerAssignment") - public static int readToEnd(InputStream stream, byte[] bytes, int offset, int length) throws IOException { + public static int readRemaining(InputStream stream, byte[] bytes, int offset, int length) throws IOException { int pos = offset; - int bytesRead = 0; + int remaining = length; + while (remaining > 0) { + int bytesRead = stream.read(bytes, pos, remaining); + if (bytesRead < 0) { + break; + } - while ((length - bytesRead) > 0 && (bytesRead = stream.read(bytes, pos, length - bytesRead)) > 0) { + remaining -= bytesRead; pos += bytesRead; } - return bytesRead; + return length - remaining; } } diff --git a/core/src/test/java/org/apache/iceberg/io/MockInputStream.java b/core/src/test/java/org/apache/iceberg/io/MockInputStream.java new file mode 100644 index 000000000000..03bdc2d97a06 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/io/MockInputStream.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.ByteArrayInputStream; + +class MockInputStream extends ByteArrayInputStream { + + static final byte[] TEST_ARRAY = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }; + + private int[] lengths; + private int current = 0; + MockInputStream(int... actualReadLengths) { + super(TEST_ARRAY); + this.lengths = actualReadLengths; + } + + @Override + public synchronized int read(byte[] b, int off, int len) { + if (current < lengths.length) { + if (len <= lengths[current]) { + // when len == lengths[current], the next read will by 0 bytes + int bytesRead = super.read(b, off, len); + lengths[current] -= bytesRead; + return bytesRead; + } else { + int bytesRead = super.read(b, off, lengths[current]); + current += 1; + return bytesRead; + } + } else { + return super.read(b, off, len); + } + } + + public long getPos() { + return this.pos; + } +} + diff --git a/core/src/test/java/org/apache/iceberg/io/TestIOUtil.java b/core/src/test/java/org/apache/iceberg/io/TestIOUtil.java new file mode 100644 index 000000000000..dae92785fc65 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/io/TestIOUtil.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.EOFException; +import java.io.IOException; +import java.util.Arrays; +import org.apache.iceberg.AssertHelpers; +import org.junit.Assert; +import org.junit.Test; + +public class TestIOUtil { + @Test + public void testReadFully() throws Exception { + byte[] buffer = new byte[5]; + + MockInputStream stream = new MockInputStream(); + IOUtil.readFully(stream, buffer, 0, buffer.length); + + Assert.assertArrayEquals("Byte array contents should match", + Arrays.copyOfRange(MockInputStream.TEST_ARRAY, 0, 5), buffer); + Assert.assertEquals("Stream position should reflect bytes read", 5, stream.getPos()); + } + + @Test + public void testReadFullySmallReads() throws Exception { + byte[] buffer = new byte[5]; + + MockInputStream stream = new MockInputStream(2, 3, 3); + IOUtil.readFully(stream, buffer, 0, buffer.length); + + Assert.assertArrayEquals("Byte array contents should match", + Arrays.copyOfRange(MockInputStream.TEST_ARRAY, 0, 5), buffer); + Assert.assertEquals("Stream position should reflect bytes read", 5, stream.getPos()); + } + + @Test + public void testReadFullyJustRight() throws Exception { + final byte[] buffer = new byte[10]; + + final MockInputStream stream = new MockInputStream(2, 3, 3); + IOUtil.readFully(stream, buffer, 0, buffer.length); + + Assert.assertArrayEquals("Byte array contents should match", MockInputStream.TEST_ARRAY, buffer); + Assert.assertEquals("Stream position should reflect bytes read", 10, stream.getPos()); + + AssertHelpers.assertThrows("Should throw EOFException if no more bytes left", + EOFException.class, () -> { + IOUtil.readFully(stream, buffer, 0, 1); + return null; + }); + } + + @Test + public void testReadFullyUnderflow() { + final byte[] buffer = new byte[11]; + + final MockInputStream stream = new MockInputStream(2, 3, 3); + + AssertHelpers.assertThrows("Should throw EOFException if no more bytes left", + EOFException.class, () -> { + IOUtil.readFully(stream, buffer, 0, buffer.length); + return null; + }); + + Assert.assertArrayEquals("Should have consumed bytes", + MockInputStream.TEST_ARRAY, Arrays.copyOfRange(buffer, 0, 10)); + Assert.assertEquals("Stream position should reflect bytes read", 10, stream.getPos()); + } + + @Test + public void testReadFullyStartAndLength() throws IOException { + byte[] buffer = new byte[10]; + + MockInputStream stream = new MockInputStream(); + IOUtil.readFully(stream, buffer, 2, 5); + + Assert.assertArrayEquals("Byte array contents should match", + Arrays.copyOfRange(MockInputStream.TEST_ARRAY, 0, 5), Arrays.copyOfRange(buffer, 2, 7)); + Assert.assertEquals("Stream position should reflect bytes read", 5, stream.getPos()); + } + + @Test + public void testReadFullyZeroByteRead() throws IOException { + byte[] buffer = new byte[0]; + + MockInputStream stream = new MockInputStream(); + IOUtil.readFully(stream, buffer, 0, buffer.length); + + Assert.assertEquals("Stream position should reflect bytes read", 0, stream.getPos()); + } + + @Test + public void testReadFullySmallReadsWithStartAndLength() throws IOException { + byte[] buffer = new byte[10]; + + MockInputStream stream = new MockInputStream(2, 2, 3); + IOUtil.readFully(stream, buffer, 2, 5); + + Assert.assertArrayEquals("Byte array contents should match", + Arrays.copyOfRange(MockInputStream.TEST_ARRAY, 0, 5), Arrays.copyOfRange(buffer, 2, 7)); + Assert.assertEquals("Stream position should reflect bytes read", 5, stream.getPos()); + } + +}