diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFSInputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFSInputStream.java index c6893c57e969..78c4bf4961d9 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFSInputStream.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFSInputStream.java @@ -19,6 +19,7 @@ package org.apache.hadoop.fs.ozone; import java.io.BufferedInputStream; +import java.io.EOFException; import java.io.File; import java.io.FileInputStream; import java.io.IOException; @@ -50,14 +51,18 @@ import org.apache.hadoop.ozone.client.OzoneClient; import org.apache.hadoop.ozone.om.OMConfigKeys; import org.apache.hadoop.ozone.om.helpers.BucketLayout; +import org.assertj.core.api.Assertions; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import static org.apache.hadoop.hdds.StringUtils.string2Bytes; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; /** * Test OzoneFSInputStream by reading through multiple interfaces. @@ -162,6 +167,124 @@ public void testO3FSSingleByteRead() throws IOException { } } + @Test + public void testByteBufferPositionedRead() throws IOException { + try (FSDataInputStream inputStream = fs.open(filePath)) { + int bufferCapacity = 20; + ByteBuffer buffer = ByteBuffer.allocate(bufferCapacity); + long currentPos = inputStream.getPos(); + // Read positional data from 50th index + int position = 50; + int readBytes = inputStream.read(position, buffer); + + // File position should not be changed after positional read + assertEquals(currentPos, inputStream.getPos()); + // Total read bytes should be equal to bufferCapacity + // As file has more data than bufferCapacity + assertEquals(readBytes, bufferCapacity); + byte[] value1 = new byte[readBytes]; + System.arraycopy(buffer.array(), 0, value1, 0, readBytes); + byte[] value2 = new byte[readBytes]; + System.arraycopy(data, position, value2, 0, readBytes); + // Verify input and positional read data + assertArrayEquals(value1, value2, "value mismatch"); + buffer.clear(); + + // Read positional from 8th index again using same inputStream + position = 8; + readBytes = inputStream.read(position, buffer); + assertEquals(currentPos, inputStream.getPos()); + assertEquals(readBytes, bufferCapacity); + byte[] value3 = new byte[readBytes]; + System.arraycopy(buffer.array(), 0, value3, 0, readBytes); + byte[] value4 = new byte[readBytes]; + System.arraycopy(data, position, value4, 0, readBytes); + // Verify input and positional read data + assertArrayEquals(value3, value4, "value mismatch"); + + // Buffer size more than actual data, still read should succeed + ByteBuffer buffer1 = ByteBuffer.allocate(30 * 1024 * 1024 * 2); + // Read positional from 12th index + position = 12; + readBytes = inputStream.read(position, buffer1); + assertEquals(currentPos, inputStream.getPos()); + // Total read bytes should be (total file bytes - position) as buffer is not filled completely + assertEquals(readBytes, 30 * 1024 * 1024 - position); + + byte[] value5 = new byte[readBytes]; + System.arraycopy(buffer1.array(), 0, value5, 0, readBytes); + byte[] value6 = new byte[readBytes]; + System.arraycopy(data, position, value6, 0, readBytes); + // Verify input and positional read data + assertArrayEquals(value5, value6, "value mismatch"); + } + } + + @ParameterizedTest + @ValueSource(ints = { -1, 30 * 1024 * 1024, 30 * 1024 * 1024 + 1 }) + public void testByteBufferPositionedReadWithInvalidPosition(int position) throws IOException { + try (FSDataInputStream inputStream = fs.open(filePath)) { + long currentPos = inputStream.getPos(); + ByteBuffer buffer = ByteBuffer.allocate(20); + assertEquals(-1, inputStream.read(position, buffer)); + // File position should not be changed + assertEquals(currentPos, inputStream.getPos()); + } + } + + @Test + public void testByteBufferPositionedReadFully() throws IOException { + try (FSDataInputStream inputStream = fs.open(filePath)) { + int bufferCapacity = 20; + long currentPos = inputStream.getPos(); + ByteBuffer buffer = ByteBuffer.allocate(bufferCapacity); + // Read positional data from 50th index + int position = 50; + inputStream.readFully(position, buffer); + // File position should not be changed after positional readFully + assertEquals(currentPos, inputStream.getPos()); + // Make sure buffer is full after readFully + Assertions.assertThat((!buffer.hasRemaining())); + + byte[] value1 = new byte[bufferCapacity]; + System.arraycopy(buffer.array(), 0, value1, 0, bufferCapacity); + byte[] value2 = new byte[bufferCapacity]; + System.arraycopy(data, position, value2, 0, bufferCapacity); + // Verify input and positional read data + assertArrayEquals(value1, value2, "value mismatch"); + buffer.clear(); + + // Read positional from 8th index again using same inputStream + position = 8; + inputStream.readFully(position, buffer); + assertEquals(currentPos, inputStream.getPos()); + Assertions.assertThat((!buffer.hasRemaining())); + byte[] value3 = new byte[bufferCapacity]; + System.arraycopy(buffer.array(), 0, value3, 0, bufferCapacity); + byte[] value4 = new byte[bufferCapacity]; + System.arraycopy(data, position, value4, 0, bufferCapacity); + // Verify input and positional read data + assertArrayEquals(value3, value4, "value mismatch"); + + // Buffer size is more than actual data, readFully should fail in this case + ByteBuffer buffer1 = ByteBuffer.allocate(30 * 1024 * 1024 * 2); + assertThrows(EOFException.class, () -> inputStream.readFully(12, buffer1)); + assertEquals(currentPos, inputStream.getPos()); + } + } + + @ParameterizedTest + @ValueSource(ints = { -1, 30 * 1024 * 1024, 30 * 1024 * 1024 + 1 }) + public void testByteBufferPositionedReadFullyWithInvalidPosition(int position) throws IOException { + try (FSDataInputStream inputStream = fs.open(filePath)) { + long currentPos = inputStream.getPos(); + ByteBuffer buffer = ByteBuffer.allocate(20); + assertThrows(EOFException.class, () -> inputStream.readFully(position, buffer)); + // File position should not be changed + assertEquals(currentPos, inputStream.getPos()); + } + } + @Test public void testO3FSMultiByteRead() throws IOException { try (FSDataInputStream inputStream = fs.open(filePath)) { diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ByteBufferPositionedReadable.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ByteBufferPositionedReadable.java new file mode 100644 index 000000000000..f5d0c8521334 --- /dev/null +++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ByteBufferPositionedReadable.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * FIXME: Hack: This is copied from Hadoop 3.3.6. Remove this interface once + * we drop Hadoop 3.1, 3.2 support. + * Implementers of this interface provide a positioned read API that writes to a + * {@link ByteBuffer} rather than a {@code byte[]}. + * + * @see PositionedReadable + * @see ByteBufferReadable + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public interface ByteBufferPositionedReadable { + /** + * Reads up to {@code buf.remaining()} bytes into buf from a given position + * in the file and returns the number of bytes read. Callers should use + * {@code buf.limit(...)} to control the size of the desired read and + * {@code buf.position(...)} to control the offset into the buffer the data + * should be written to. + *

+ * After a successful call, {@code buf.position()} will be advanced by the + * number of bytes read and {@code buf.limit()} will be unchanged. + *

+ * In the case of an exception, the state of the buffer (the contents of the + * buffer, the {@code buf.position()}, the {@code buf.limit()}, etc.) is + * undefined, and callers should be prepared to recover from this + * eventuality. + *

+ * Callers should use {@link StreamCapabilities#hasCapability(String)} with + * {@link StreamCapabilities#PREADBYTEBUFFER} to check if the underlying + * stream supports this interface, otherwise they might get a + * {@link UnsupportedOperationException}. + *

+ * Implementations should treat 0-length requests as legitimate, and must not + * signal an error upon their receipt. + *

+ * This does not change the current offset of a file, and is thread-safe. + * + * @param position position within file + * @param buf the ByteBuffer to receive the results of the read operation. + * @return the number of bytes read, possibly zero, or -1 if reached + * end-of-stream + * @throws IOException if there is some error performing the read + */ + int read(long position, ByteBuffer buf) throws IOException; + + /** + * Reads {@code buf.remaining()} bytes into buf from a given position in + * the file or until the end of the data was reached before the read + * operation completed. Callers should use {@code buf.limit(...)} to + * control the size of the desired read and {@code buf.position(...)} to + * control the offset into the buffer the data should be written to. + *

+ * This operation provides similar semantics to + * {@link #read(long, ByteBuffer)}, the difference is that this method is + * guaranteed to read data until the {@link ByteBuffer} is full, or until + * the end of the data stream is reached. + * + * @param position position within file + * @param buf the ByteBuffer to receive the results of the read operation. + * @throws IOException if there is some error performing the read + * @throws EOFException the end of the data was reached before + * the read operation completed + * @see #read(long, ByteBuffer) + */ + void readFully(long position, ByteBuffer buf) throws IOException; +} diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/CapableOzoneFSInputStream.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/CapableOzoneFSInputStream.java index 290546e4a104..30e0c32265bf 100644 --- a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/CapableOzoneFSInputStream.java +++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/CapableOzoneFSInputStream.java @@ -35,6 +35,7 @@ public boolean hasCapability(String capability) { switch (StringUtils.toLowerCase(capability)) { case StreamCapabilities.READBYTEBUFFER: case StreamCapabilities.UNBUFFER: + case StreamCapabilities.PREADBYTEBUFFER: return true; default: return false; diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSInputStream.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSInputStream.java index 918640799c71..b1d7d92e9f2e 100644 --- a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSInputStream.java +++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSInputStream.java @@ -18,6 +18,7 @@ package org.apache.hadoop.fs.ozone; +import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; @@ -27,6 +28,7 @@ import org.apache.hadoop.hdds.annotation.InterfaceAudience; import org.apache.hadoop.hdds.annotation.InterfaceStability; import org.apache.hadoop.fs.ByteBufferReadable; +import org.apache.hadoop.fs.ByteBufferPositionedReadable; import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FileSystem.Statistics; import org.apache.hadoop.fs.Seekable; @@ -40,7 +42,7 @@ @InterfaceAudience.Private @InterfaceStability.Evolving public class OzoneFSInputStream extends FSInputStream - implements ByteBufferReadable, CanUnbuffer { + implements ByteBufferReadable, CanUnbuffer, ByteBufferPositionedReadable { private final InputStream inputStream; private final Statistics statistics; @@ -137,4 +139,49 @@ public void unbuffer() { ((CanUnbuffer) inputStream).unbuffer(); } } + + /** + * @param buf the ByteBuffer to receive the results of the read operation. + * @param position offset + * @return the number of bytes read, possibly zero, or -1 if + * reach end-of-stream + * @throws IOException if there is some error performing the read + */ + @Override + public int read(long position, ByteBuffer buf) throws IOException { + if (!buf.hasRemaining()) { + return 0; + } + long oldPos = this.getPos(); + int bytesRead; + try { + ((Seekable) inputStream).seek(position); + bytesRead = ((ByteBufferReadable) inputStream).read(buf); + } catch (EOFException e) { + // Either position is negative or it has reached EOF + return -1; + } finally { + ((Seekable) inputStream).seek(oldPos); + } + return bytesRead; + } + + /** + * @param buf the ByteBuffer to receive the results of the read operation. + * @param position offset + * @return void + * @throws IOException if there is some error performing the read + * @throws EOFException if end of file reached before reading fully + */ + @Override + public void readFully(long position, ByteBuffer buf) throws IOException { + int bytesRead; + for (int readCount = 0; buf.hasRemaining(); readCount += bytesRead) { + bytesRead = this.read(position + (long)readCount, buf); + if (bytesRead < 0) { + // Still buffer has space to read but stream has already reached EOF + throw new EOFException("End of file reached before reading fully."); + } + } + } }