diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index 0dd3dcf065fbe..dd0aecef618ff 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -542,7 +542,7 @@ public synchronized void seek(long n) throws IOException { if (n < 0) { throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK); } - if (n > contentLength) { + if (n > 0 && n >= contentLength) { throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF); } @@ -583,8 +583,8 @@ public synchronized long skip(long n) throws IOException { newPos = 0; n = newPos - currentPos; } - if (newPos > contentLength) { - newPos = contentLength; + if (newPos > 0 && newPos >= contentLength) { + newPos = contentLength - 1; n = newPos - currentPos; } seek(newPos); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java index a33a76ecefe77..8289cd299ded8 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java @@ -20,6 +20,7 @@ import java.io.IOException; +import org.apache.hadoop.conf.Configuration; import org.assertj.core.api.Assertions; import org.junit.Test; import org.slf4j.Logger; @@ -33,6 +34,8 @@ import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation; import org.apache.hadoop.io.IOUtils; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_READ_BUFFER_SIZE; + public class ITestAbfsInputStreamStatistics extends AbstractAbfsIntegrationTest { private static final int OPERATIONS = 10; @@ -89,11 +92,13 @@ public void testInitValues() throws IOException { * Test to check statistics from seek operation in AbfsInputStream. */ @Test - public void testSeekStatistics() throws IOException { + public void testSeekStatistics() throws Exception { describe("Testing the values of statistics from seek operations in " + "AbfsInputStream"); - AzureBlobFileSystem fs = getFileSystem(); + Configuration config = getRawConfiguration(); + config.set(AZURE_READ_BUFFER_SIZE, String.valueOf(ONE_MB - 1)); + AzureBlobFileSystem fs = getFileSystem(config); AzureBlobFileSystemStore abfss = fs.getAbfsStore(); Path seekStatPath = path(getMethodName()); @@ -112,7 +117,7 @@ public void testSeekStatistics() throws IOException { * Writing 1MB buffer to the file, this would make the fCursor(Current * position of cursor) to the end of file. */ - int result = in.read(defBuffer, 0, ONE_MB); + int result = in.read(defBuffer, 0, ONE_MB - 1); LOG.info("Result of read : {}", result); /* @@ -121,7 +126,7 @@ public void testSeekStatistics() throws IOException { */ for (int i = 0; i < OPERATIONS; i++) { in.seek(0); - in.seek(ONE_MB); + in.seek(ONE_MB - 1); } AbfsInputStreamStatisticsImpl stats = @@ -159,7 +164,7 @@ public void testSeekStatistics() throws IOException { assertEquals("Mismatch in forwardSeekOps value", OPERATIONS, stats.getForwardSeekOperations()); assertEquals("Mismatch in bytesBackwardsOnSeek value", - OPERATIONS * ONE_MB, stats.getBytesBackwardsOnSeek()); + OPERATIONS * (ONE_MB - 1), stats.getBytesBackwardsOnSeek()); assertEquals("Mismatch in bytesSkippedOnSeek value", 0, stats.getBytesSkippedOnSeek()); assertEquals("Mismatch in seekInBuffer value", 2 * OPERATIONS, @@ -279,7 +284,7 @@ public void testWithNullStreamStatistics() throws IOException { // Verifying that AbfsInputStream Operations works with null statistics. assertNotEquals("AbfsInputStream read() with null statistics should " + "work", -1, in.read()); - in.seek(ONE_KB); + in.seek(ONE_KB - 1); // Verifying toString() with no StreamStatistics. LOG.info("AbfsInputStream: {}", in.toString()); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java index 52abb097ef311..b76c22311d259 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java @@ -18,6 +18,8 @@ package org.apache.hadoop.fs.azurebfs; +import java.io.EOFException; +import java.io.IOException; import java.util.Arrays; import java.util.Random; @@ -33,6 +35,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MAX_BUFFER_SIZE; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; /** * Test read, write and seek. @@ -70,21 +73,32 @@ private void testReadWriteAndSeek(int bufferSize) throws Exception { abfsConfiguration.setReadBufferSize(bufferSize); - final byte[] b = new byte[2 * bufferSize]; + int contentLength = 2 * bufferSize; + final byte[] b = new byte[contentLength]; new Random().nextBytes(b); try (FSDataOutputStream stream = fs.create(TEST_PATH)) { stream.write(b); } - final byte[] readBuffer = new byte[2 * bufferSize]; + final byte[] readBuffer = new byte[contentLength]; int result; try (FSDataInputStream inputStream = fs.open(TEST_PATH)) { + //seek to file mid and read until (excluding) the last byte inputStream.seek(bufferSize); - result = inputStream.read(readBuffer, bufferSize, bufferSize); + result = inputStream.read(readBuffer, bufferSize, bufferSize - 1); assertNotEquals(-1, result); + //seek to first byte and read till file mid inputStream.seek(0); result = inputStream.read(readBuffer, 0, bufferSize); + //test seek beyond EOF handling + intercept(EOFException.class, () -> inputStream.seek(contentLength)); + //seek to last valid position and read + inputStream.seek(contentLength - 1); + result = inputStream.read(readBuffer, contentLength - 1, 1); + assertNotEquals("Read should succeed for last byte", -1, result); + //negative seek + intercept(IOException.class, () -> inputStream.seek(-1)); } assertNotEquals("data read in final read()", -1, result); assertArrayEquals(readBuffer, b); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java index ef531acb2bbbc..c91c15c937a75 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java @@ -203,23 +203,36 @@ public void testSkipBounds() throws Exception { assertTrue(testFileLength > 0); - skipped = inputStream.skip(testFileLength); - assertEquals(testFileLength, skipped); + //test skip to EOF with correct input skip count + assertEquals("Position should be 0", 0, inputStream.getPos()); + inputStream.skip(testFileLength - 1); + assertEquals("Position should be EOF", testFileLength - 1, + inputStream.getPos()); - intercept(EOFException.class, - new Callable() { - @Override - public Long call() throws Exception { - return inputStream.skip(1); - } - } - ); long elapsedTimeMs = timer.elapsedTimeMs(); assertTrue( - String.format( - "There should not be any network I/O (elapsedTimeMs=%1$d).", - elapsedTimeMs), - elapsedTimeMs < MAX_ELAPSEDTIMEMS); + String.format( + "There should not be any network I/O (elapsedTimeMs=%1$d).", + elapsedTimeMs), + elapsedTimeMs < MAX_ELAPSEDTIMEMS); + + //test negative skip from last valid position + skipped = inputStream.skip(-testFileLength + 1); + assertEquals("Incorrect skip count", -testFileLength + 1, skipped); + assertEquals("Position should be 0", 0, inputStream.getPos()); + + //test large positive skip from position 0 beyond EOF + skipped = inputStream.skip(testFileLength); + assertEquals("Incorrect skip count", testFileLength - 1, skipped); + assertEquals("One byte should be available after skip to EOF", 1, + inputStream.available()); + + //test positive skip from contentlength postion (EOF + 1) + inputStream.read(); //read 1 byte from EOF + assertEquals("Position should be testFileLength", testFileLength, + inputStream.getPos()); + intercept(EOFException.class, FSExceptionMessages.CANNOT_SEEK_PAST_EOF, + () -> inputStream.skip(1)); } } @@ -251,15 +264,15 @@ public FSDataInputStream call() throws Exception { ); assertTrue("Test file length only " + testFileLength, testFileLength > 0); - inputStream.seek(testFileLength); - assertEquals(testFileLength, inputStream.getPos()); + inputStream.seek(testFileLength - 1); + assertEquals(testFileLength - 1, inputStream.getPos()); intercept(EOFException.class, FSExceptionMessages.CANNOT_SEEK_PAST_EOF, new Callable() { @Override public FSDataInputStream call() throws Exception { - inputStream.seek(testFileLength + 1); + inputStream.seek(testFileLength); return inputStream; } } @@ -405,6 +418,29 @@ public void testSkipAndAvailableAndPosition() throws Exception { } } + @Test + public void testZeroByteFile() throws Exception { + Path emptyFile = new Path("/emptyFile"); + getFileSystem().create(emptyFile); + FSDataInputStream in = getFileSystem().open(emptyFile); + assertEquals("Initial position of inputstream in empty file is 0", 0, + in.getPos()); + in.seek(0); + assertEquals("Seek to 0 should succeed", 0, in.getPos()); + long skipped = in.skip(0); + assertEquals("Number of skipped bytes should be 0", 0, skipped); + assertEquals("Position should be 0 post skip 0", 0, in.getPos()); + assertEquals("Available bytes in empty file is 0", 0, in.available()); + + intercept(EOFException.class, () -> in.seek(1)); + intercept(EOFException.class, () -> in.seek(-1)); + //skip(1) from position 0 does not seek(0) since pos = contentlength + intercept(EOFException.class, () -> in.skip(1)); + skipped = in.skip(-1); + assertEquals("Number of skipped bytes should be 0", 0, skipped); + assertEquals("Should seek to 0", 0, in.getPos()); + } + /** * Ensures parity in the performance of sequential read after reverse seek for * abfs of the AbfsInputStream.