diff --git a/LICENSE b/LICENSE index 82a12b78272b..064892499911 100644 --- a/LICENSE +++ b/LICENSE @@ -228,6 +228,7 @@ This product includes code from Apache Parquet. * DynMethods.java * DynConstructors.java * AssertHelpers.java +* IOUtil.java readFully and tests Copyright: 2014-2017 The Apache Software Foundation. Home page: https://parquet.apache.org/ diff --git a/api/src/main/java/org/apache/iceberg/io/RangeReadable.java b/api/src/main/java/org/apache/iceberg/io/RangeReadable.java new file mode 100644 index 000000000000..fafdd7b02037 --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/io/RangeReadable.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.Closeable; +import java.io.IOException; + +/** + * {@code RangeReadable} is an interface that allows for implementations + * of {@link InputFile} streams to perform positional, range-based reads, which + * are more efficient than unbounded reads in many cloud provider object stores. + * + * Thread safety is not a requirement of the interface and is left to the + * implementation. + * + * If the implementation is also a {@link SeekableInputStream}, the position + * of the stream is not required to be updated based on the positional reads + * performed by this interface. Usage of {@link SeekableInputStream} should + * always seek to the appropriate position for {@link java.io.InputStream} + * based reads. + * + */ +public interface RangeReadable extends Closeable { + + /** + * Fill the provided buffer with the contents of the input source starting + * at {@code position} for the given {@code offset} and {@code length}. + * + * @param position start position of the read + * @param buffer target buffer to copy data + * @param offset offset in the buffer to copy the data + * @param length size of the read + */ + void readFully(long position, byte[] buffer, int offset, int length) throws IOException; + + /** + * Fill the entire buffer with the contents of the input source starting + * at {@code position}. + * + * @param position start position of the read + * @param buffer target buffer to copy data + */ + default void readFully(long position, byte[] buffer) throws IOException { + readFully(position, buffer, 0, buffer.length); + } + + /** + * Read the last {@code length} bytes from the file. + * + * @param buffer the buffer to write data into + * @param offset the offset in the buffer to start writing + * @param length the number of bytes from the end of the object to read + * @return the actual number of bytes read + * @throws IOException if an error occurs while reading + */ + int readTail(byte [] buffer, int offset, int length) throws IOException; + + /** + * Read the full size of the buffer from the end of the file. + * + * @param buffer the buffer to write data into + * @return the actual number of bytes read + * @throws IOException if an error occurs while reading + */ + default int readTail(byte [] buffer) throws IOException { + return readTail(buffer, 0, buffer.length); + } +} diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java index 394449aee147..92d0f3f3eae9 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java @@ -24,6 +24,8 @@ import java.util.Arrays; import org.apache.iceberg.aws.AwsProperties; import org.apache.iceberg.io.FileIOMetricsContext; +import org.apache.iceberg.io.IOUtil; +import org.apache.iceberg.io.RangeReadable; import org.apache.iceberg.io.SeekableInputStream; import org.apache.iceberg.metrics.MetricsContext; import org.apache.iceberg.metrics.MetricsContext.Counter; @@ -37,7 +39,7 @@ import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.GetObjectRequest; -class S3InputStream extends SeekableInputStream { +class S3InputStream extends SeekableInputStream implements RangeReadable { private static final Logger LOG = LoggerFactory.getLogger(S3InputStream.class); private final StackTraceElement[] createStack; @@ -111,6 +113,35 @@ public int read(byte[] b, int off, int len) throws IOException { return bytesRead; } + @Override + public void readFully(long position, byte[] buffer, int offset, int length) throws IOException { + Preconditions.checkPositionIndexes(offset, offset + length, buffer.length); + + String range = String.format("bytes=%s-%s", position, position + length - 1); + + IOUtil.readFully(readRange(range), buffer, offset, length); + } + + @Override + public int readTail(byte[] buffer, int offset, int length) throws IOException { + Preconditions.checkPositionIndexes(offset, offset + length, buffer.length); + + String range = String.format("bytes=-%s", length); + + return IOUtil.readRemaining(readRange(range), buffer, offset, length); + } + + private InputStream readRange(String range) { + GetObjectRequest.Builder requestBuilder = GetObjectRequest.builder() + .bucket(location.bucket()) + .key(location.key()) + .range(range); + + S3RequestUtil.configureEncryption(awsProperties, requestBuilder); + + return s3.getObject(requestBuilder.build(), ResponseTransformer.toInputStream()); + } + @Override public void close() throws IOException { super.close(); diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3InputStream.java b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3InputStream.java index 9568ac600882..417ba16f71ad 100644 --- a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3InputStream.java +++ b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3InputStream.java @@ -24,6 +24,7 @@ import java.util.Arrays; import java.util.Random; import org.apache.commons.io.IOUtils; +import org.apache.iceberg.io.RangeReadable; import org.apache.iceberg.io.SeekableInputStream; import org.junit.Before; import org.junit.ClassRule; @@ -105,6 +106,49 @@ private void readAndCheck(SeekableInputStream in, long rangeStart, int size, byt assertArrayEquals(Arrays.copyOfRange(original, (int) rangeStart, (int) rangeEnd), actual); } + @Test + public void testRangeRead() throws Exception { + S3URI uri = new S3URI("s3://bucket/path/to/range-read.dat"); + int dataSize = 1024 * 1024 * 10; + byte[] expected = randomData(dataSize); + byte[] actual = new byte[dataSize]; + + long position; + int offset; + int length; + + writeS3Data(uri, expected); + + try (RangeReadable in = new S3InputStream(s3, uri)) { + // first 1k + position = 0; + offset = 0; + length = 1024; + readAndCheckRanges(in, expected, position, actual, offset, length); + + // last 1k + position = dataSize - 1024; + offset = dataSize - 1024; + readAndCheckRanges(in, expected, position, actual, offset, length); + + // middle 2k + position = dataSize / 2 - 1024; + offset = dataSize / 2 - 1024; + length = 1024 * 2; + readAndCheckRanges(in, expected, position, actual, offset, length); + } + } + + private void readAndCheckRanges( + RangeReadable in, byte [] original, long position, byte [] buffer, int offset, + int length) throws IOException { + in.readFully(position, buffer, offset, length); + + assertArrayEquals( + Arrays.copyOfRange(original, offset, offset + length), + Arrays.copyOfRange(buffer, offset, offset + length)); + } + @Test public void testClose() throws Exception { S3URI uri = new S3URI("s3://bucket/path/to/closed.dat"); diff --git a/core/src/main/java/org/apache/iceberg/io/IOUtil.java b/core/src/main/java/org/apache/iceberg/io/IOUtil.java new file mode 100644 index 000000000000..0daf4a019358 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/IOUtil.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; + +public class IOUtil { + // not meant to be instantiated + private IOUtil() { + } + + /** + * Reads into a buffer from a stream, making multiple read calls if necessary. + * + * @param stream an InputStream to read from + * @param bytes a buffer to write into + * @param offset starting offset in the buffer for the data + * @param length length of bytes to copy from the input stream to the buffer + * @throws EOFException if the end of the stream is reached before reading length bytes + * @throws IOException if there is an error while reading + */ + public static void readFully(InputStream stream, byte[] bytes, int offset, int length) throws IOException { + int bytesRead = readRemaining(stream, bytes, offset, length); + if (bytesRead < length) { + throw new EOFException( + "Reached the end of stream with " + (length - bytesRead) + " bytes left to read"); + } + } + + /** + * Reads into a buffer from a stream, making multiple read calls if necessary + * returning the number of bytes read until end of stream. + * + * @param stream an InputStream to read from + * @param bytes a buffer to write into + * @param offset starting offset in the buffer for the data + * @param length length of bytes to copy from the input stream to the buffer + * @return the number of bytes read + * @throws IOException if there is an error while reading + */ + public static int readRemaining(InputStream stream, byte[] bytes, int offset, int length) throws IOException { + int pos = offset; + int remaining = length; + while (remaining > 0) { + int bytesRead = stream.read(bytes, pos, remaining); + if (bytesRead < 0) { + break; + } + + remaining -= bytesRead; + pos += bytesRead; + } + + return length - remaining; + } +} diff --git a/core/src/test/java/org/apache/iceberg/io/MockInputStream.java b/core/src/test/java/org/apache/iceberg/io/MockInputStream.java new file mode 100644 index 000000000000..7cc251ebd641 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/io/MockInputStream.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.ByteArrayInputStream; + +class MockInputStream extends ByteArrayInputStream { + + static final byte[] TEST_ARRAY = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }; + + private int[] lengths; + private int current = 0; + + MockInputStream(int... actualReadLengths) { + super(TEST_ARRAY); + this.lengths = actualReadLengths; + } + + @Override + public synchronized int read(byte[] b, int off, int len) { + if (current < lengths.length) { + if (len <= lengths[current]) { + // when len == lengths[current], the next read will by 0 bytes + int bytesRead = super.read(b, off, len); + lengths[current] -= bytesRead; + return bytesRead; + } else { + int bytesRead = super.read(b, off, lengths[current]); + current += 1; + return bytesRead; + } + } else { + return super.read(b, off, len); + } + } + + public long getPos() { + return this.pos; + } +} + diff --git a/core/src/test/java/org/apache/iceberg/io/TestIOUtil.java b/core/src/test/java/org/apache/iceberg/io/TestIOUtil.java new file mode 100644 index 000000000000..dae92785fc65 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/io/TestIOUtil.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.EOFException; +import java.io.IOException; +import java.util.Arrays; +import org.apache.iceberg.AssertHelpers; +import org.junit.Assert; +import org.junit.Test; + +public class TestIOUtil { + @Test + public void testReadFully() throws Exception { + byte[] buffer = new byte[5]; + + MockInputStream stream = new MockInputStream(); + IOUtil.readFully(stream, buffer, 0, buffer.length); + + Assert.assertArrayEquals("Byte array contents should match", + Arrays.copyOfRange(MockInputStream.TEST_ARRAY, 0, 5), buffer); + Assert.assertEquals("Stream position should reflect bytes read", 5, stream.getPos()); + } + + @Test + public void testReadFullySmallReads() throws Exception { + byte[] buffer = new byte[5]; + + MockInputStream stream = new MockInputStream(2, 3, 3); + IOUtil.readFully(stream, buffer, 0, buffer.length); + + Assert.assertArrayEquals("Byte array contents should match", + Arrays.copyOfRange(MockInputStream.TEST_ARRAY, 0, 5), buffer); + Assert.assertEquals("Stream position should reflect bytes read", 5, stream.getPos()); + } + + @Test + public void testReadFullyJustRight() throws Exception { + final byte[] buffer = new byte[10]; + + final MockInputStream stream = new MockInputStream(2, 3, 3); + IOUtil.readFully(stream, buffer, 0, buffer.length); + + Assert.assertArrayEquals("Byte array contents should match", MockInputStream.TEST_ARRAY, buffer); + Assert.assertEquals("Stream position should reflect bytes read", 10, stream.getPos()); + + AssertHelpers.assertThrows("Should throw EOFException if no more bytes left", + EOFException.class, () -> { + IOUtil.readFully(stream, buffer, 0, 1); + return null; + }); + } + + @Test + public void testReadFullyUnderflow() { + final byte[] buffer = new byte[11]; + + final MockInputStream stream = new MockInputStream(2, 3, 3); + + AssertHelpers.assertThrows("Should throw EOFException if no more bytes left", + EOFException.class, () -> { + IOUtil.readFully(stream, buffer, 0, buffer.length); + return null; + }); + + Assert.assertArrayEquals("Should have consumed bytes", + MockInputStream.TEST_ARRAY, Arrays.copyOfRange(buffer, 0, 10)); + Assert.assertEquals("Stream position should reflect bytes read", 10, stream.getPos()); + } + + @Test + public void testReadFullyStartAndLength() throws IOException { + byte[] buffer = new byte[10]; + + MockInputStream stream = new MockInputStream(); + IOUtil.readFully(stream, buffer, 2, 5); + + Assert.assertArrayEquals("Byte array contents should match", + Arrays.copyOfRange(MockInputStream.TEST_ARRAY, 0, 5), Arrays.copyOfRange(buffer, 2, 7)); + Assert.assertEquals("Stream position should reflect bytes read", 5, stream.getPos()); + } + + @Test + public void testReadFullyZeroByteRead() throws IOException { + byte[] buffer = new byte[0]; + + MockInputStream stream = new MockInputStream(); + IOUtil.readFully(stream, buffer, 0, buffer.length); + + Assert.assertEquals("Stream position should reflect bytes read", 0, stream.getPos()); + } + + @Test + public void testReadFullySmallReadsWithStartAndLength() throws IOException { + byte[] buffer = new byte[10]; + + MockInputStream stream = new MockInputStream(2, 2, 3); + IOUtil.readFully(stream, buffer, 2, 5); + + Assert.assertArrayEquals("Byte array contents should match", + Arrays.copyOfRange(MockInputStream.TEST_ARRAY, 0, 5), Arrays.copyOfRange(buffer, 2, 7)); + Assert.assertEquals("Stream position should reflect bytes read", 5, stream.getPos()); + } + +}