diff --git a/common/src/main/java/software/amazon/s3/analyticsaccelerator/request/ReadMode.java b/common/src/main/java/software/amazon/s3/analyticsaccelerator/request/ReadMode.java index 502156d0..25b51258 100644 --- a/common/src/main/java/software/amazon/s3/analyticsaccelerator/request/ReadMode.java +++ b/common/src/main/java/software/amazon/s3/analyticsaccelerator/request/ReadMode.java @@ -15,12 +15,35 @@ */ package software.amazon.s3.analyticsaccelerator.request; +import lombok.AllArgsConstructor; + /** * Enum to help with the annotation of reads. We mark reads SYNC when they were triggered by a * synchronous read or ASYNC when they were to do logical or physical prefetching. */ +@AllArgsConstructor public enum ReadMode { - SYNC, - ASYNC, - SMALL_OBJECT_PREFETCH; + SYNC(true), + ASYNC(true), + SMALL_OBJECT_PREFETCH(true), + SEQUENTIAL_FILE_PREFETCH(true), + DICTIONARY_PREFETCH(false), + COLUMN_PREFETCH(false), + REMAINING_COLUMN_PREFETCH(false), + PREFETCH_TAIL(false), + READ_VECTORED(false); + + private final boolean allowRequestExtension; + + /** + * Should requests be extended for this read mode? + * + *

When the read is from the parquet prefetcher or readVectored(), we know the exact ranges we + * want to read, so in this case don't extend the ranges. + * + * @return true if requests should be extended + */ + public boolean allowRequestExtension() { + return allowRequestExtension; + } } diff --git a/common/src/main/java/software/amazon/s3/analyticsaccelerator/util/MetricKey.java b/common/src/main/java/software/amazon/s3/analyticsaccelerator/util/MetricKey.java index 41910498..0b40fecf 100644 --- a/common/src/main/java/software/amazon/s3/analyticsaccelerator/util/MetricKey.java +++ b/common/src/main/java/software/amazon/s3/analyticsaccelerator/util/MetricKey.java @@ -35,7 +35,13 @@ public enum MetricKey { /** * Tracks the number of cache misses. Incremented when requested block is not found in the cache */ - CACHE_MISS("CacheMiss"); + CACHE_MISS("CacheMiss"), + + /** Counts number of GET requests made. */ + GET_REQUEST_COUNT("GetRequestCount"), + + /** Counts number of HEAD requests made. */ + HEAD_REQUEST_COUNT("HeadRequestCount"); /** The string name representation of the metric. */ private final String name; diff --git a/common/src/test/java/software/amazon/s3/analyticsaccelerator/common/MetricKeyTest.java b/common/src/test/java/software/amazon/s3/analyticsaccelerator/common/MetricKeyTest.java index 5b7a46fd..fa6f7c0c 100644 --- a/common/src/test/java/software/amazon/s3/analyticsaccelerator/common/MetricKeyTest.java +++ b/common/src/test/java/software/amazon/s3/analyticsaccelerator/common/MetricKeyTest.java @@ -27,14 +27,18 @@ public void testMetricKeyNames() { assertEquals("MemoryUsage", MetricKey.MEMORY_USAGE.getName()); assertEquals("CacheHit", MetricKey.CACHE_HIT.getName()); assertEquals("CacheMiss", MetricKey.CACHE_MISS.getName()); + assertEquals("GetRequestCount", MetricKey.GET_REQUEST_COUNT.getName()); + assertEquals("HeadRequestCount", MetricKey.HEAD_REQUEST_COUNT.getName()); } @Test public void testEnumValues() { MetricKey[] values = MetricKey.values(); - assertEquals(3, values.length); + assertEquals(5, values.length); assertEquals(MetricKey.MEMORY_USAGE, values[0]); assertEquals(MetricKey.CACHE_HIT, values[1]); assertEquals(MetricKey.CACHE_MISS, values[2]); + assertEquals(MetricKey.GET_REQUEST_COUNT, values[3]); + assertEquals(MetricKey.HEAD_REQUEST_COUNT, values[4]); } } diff --git a/input-stream/src/integrationTest/java/software/amazon/s3/analyticsaccelerator/access/IntegrationTestBase.java b/input-stream/src/integrationTest/java/software/amazon/s3/analyticsaccelerator/access/IntegrationTestBase.java index 4d851b91..6b67f945 100644 --- a/input-stream/src/integrationTest/java/software/amazon/s3/analyticsaccelerator/access/IntegrationTestBase.java +++ b/input-stream/src/integrationTest/java/software/amazon/s3/analyticsaccelerator/access/IntegrationTestBase.java @@ -27,12 +27,10 @@ import java.io.IOException; import java.io.InputStream; -import java.nio.ByteBuffer; import java.security.SecureRandom; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.IntFunction; import java.util.stream.Stream; import lombok.NonNull; import org.junit.jupiter.api.AfterEach; @@ -45,7 +43,6 @@ import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.S3Exception; import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream; -import software.amazon.s3.analyticsaccelerator.common.ObjectRange; import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation; import software.amazon.s3.analyticsaccelerator.util.S3URI; @@ -187,58 +184,6 @@ protected void testChangingEtagMidStream( } } - /** - * This test verifies that the data in the buffers is the same when a file is read through - * readVectored() vs stream.read(buf[], off, len). - * - * @param s3ClientKind S3 client kind to use - * @param s3Object S3 object to read - * @param streamReadPatternKind stream read pattern to apply - * @param AALInputStreamConfigurationKind configuration kind - * @param allocate method to allocate the buffer, can be direct or non-direct - * @throws IOException on any IOException - */ - protected void testReadVectored( - @NonNull S3ClientKind s3ClientKind, - @NonNull S3Object s3Object, - @NonNull StreamReadPatternKind streamReadPatternKind, - @NonNull AALInputStreamConfigurationKind AALInputStreamConfigurationKind, - @NonNull IntFunction allocate) - throws IOException { - - try (S3AALClientStreamReader s3AALClientStreamReader = - this.createS3AALClientStreamReader(s3ClientKind, AALInputStreamConfigurationKind)) { - - S3SeekableInputStream s3SeekableInputStream = - s3AALClientStreamReader.createReadStream(s3Object, OpenStreamInformation.DEFAULT); - - List objectRanges = new ArrayList<>(); - objectRanges.add(new ObjectRange(new CompletableFuture<>(), 50, 500)); - objectRanges.add(new ObjectRange(new CompletableFuture<>(), 1000, 800)); - objectRanges.add(new ObjectRange(new CompletableFuture<>(), 4000, 5000)); - - s3SeekableInputStream.readVectored( - objectRanges, - allocate, - (buffer) -> { - LOG.debug("Release buffer of length {}: {}", buffer.limit(), buffer); - }); - - for (ObjectRange objectRange : objectRanges) { - ByteBuffer byteBuffer = objectRange.getByteBuffer().join(); - - S3SeekableInputStream verificationStream = - s3AALClientStreamReader.createReadStream(s3Object, OpenStreamInformation.DEFAULT); - verificationStream.seek(objectRange.getOffset()); - byte[] buffer = new byte[objectRange.getLength()]; - int readBytes = verificationStream.read(buffer, 0, buffer.length); - - assertEquals(readBytes, buffer.length); - verifyBufferContentsEqual(byteBuffer, buffer); - } - } - } - /** * Used to read and assert helps when we want to run it in a lambda. * @@ -254,18 +199,6 @@ private void readAndAssert(S3SeekableInputStream stream, byte[] buffer, int offs assertEquals(readBytes, len); } - /** - * Verify the contents of two buffers are equal - * - * @param buffer ByteBuffer to verify contents for - * @param expected expected contents in byte buffer - */ - private void verifyBufferContentsEqual(ByteBuffer buffer, byte[] expected) { - for (int i = 0; i < expected.length; i++) { - assertEquals(buffer.get(i), expected[i]); - } - } - /** * Tests to make sure if we have read our whole object we pass and return our cached data even if * the etag has changed after the read is complete diff --git a/input-stream/src/integrationTest/java/software/amazon/s3/analyticsaccelerator/access/ReadVectoredTest.java b/input-stream/src/integrationTest/java/software/amazon/s3/analyticsaccelerator/access/ReadVectoredTest.java index 3d904105..7fcd97d0 100644 --- a/input-stream/src/integrationTest/java/software/amazon/s3/analyticsaccelerator/access/ReadVectoredTest.java +++ b/input-stream/src/integrationTest/java/software/amazon/s3/analyticsaccelerator/access/ReadVectoredTest.java @@ -15,16 +15,37 @@ */ package software.amazon.s3.analyticsaccelerator.access; +import static org.junit.jupiter.api.Assertions.*; +import static software.amazon.s3.analyticsaccelerator.util.Constants.ONE_MB; + +import java.io.EOFException; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.*; +import java.util.function.Consumer; +import java.util.function.IntFunction; import java.util.stream.Stream; +import lombok.NonNull; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream; +import software.amazon.s3.analyticsaccelerator.common.ObjectRange; +import software.amazon.s3.analyticsaccelerator.util.MetricKey; +import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation; public class ReadVectoredTest extends IntegrationTestBase { + private static final Logger LOG = LoggerFactory.getLogger(ReadVectoredTest.class); + private static final Consumer LOG_BYTE_BUFFER_RELEASED = + (buffer) -> { + LOG.debug("Release buffer of length {}: {}", buffer.limit(), buffer); + }; + @ParameterizedTest @MethodSource("vectoredReads") void testVectoredReads( @@ -33,23 +54,426 @@ void testVectoredReads( StreamReadPatternKind streamReadPattern, AALInputStreamConfigurationKind configuration) throws IOException { + // Run with non-direct buffers testReadVectored( s3ClientKind, s3Object, streamReadPattern, configuration, ByteBuffer::allocate); + // Run with direct buffers testReadVectored( s3ClientKind, s3Object, streamReadPattern, configuration, ByteBuffer::allocateDirect); } + @ParameterizedTest + @MethodSource("vectoredReads") + void testVectoredReadsInSingleBlock( + S3ClientKind s3ClientKind, + S3Object s3Object, + StreamReadPatternKind streamReadPattern, + AALInputStreamConfigurationKind configuration) + throws IOException { + + testReadVectoredInSingleBlock( + s3ClientKind, s3Object, streamReadPattern, configuration, ByteBuffer::allocate); + + testReadVectoredInSingleBlock( + s3ClientKind, s3Object, streamReadPattern, configuration, ByteBuffer::allocateDirect); + } + + @ParameterizedTest + @MethodSource("vectoredReads") + void testVectoredReadsForSequentialRanges( + S3ClientKind s3ClientKind, + S3Object s3Object, + StreamReadPatternKind streamReadPattern, + AALInputStreamConfigurationKind configuration) + throws IOException { + + testReadVectoredForSequentialRanges( + s3ClientKind, s3Object, streamReadPattern, configuration, ByteBuffer::allocate); + + testReadVectoredForSequentialRanges( + s3ClientKind, s3Object, streamReadPattern, configuration, ByteBuffer::allocateDirect); + } + + @Test + void testEmptyRanges() throws IOException { + try (S3AALClientStreamReader s3AALClientStreamReader = + this.createS3AALClientStreamReader( + S3ClientKind.SDK_V2_JAVA_ASYNC, AALInputStreamConfigurationKind.READ_CORRECTNESS)) { + + IntFunction allocate = ByteBuffer::allocate; + + S3SeekableInputStream s3SeekableInputStream = + s3AALClientStreamReader.createReadStream( + S3Object.RANDOM_1GB, OpenStreamInformation.DEFAULT); + + List objectRanges = new ArrayList<>(); + + s3SeekableInputStream.readVectored(objectRanges, allocate, LOG_BYTE_BUFFER_RELEASED); + + assertEquals(0, objectRanges.size()); + + assertEquals( + s3AALClientStreamReader + .getS3SeekableInputStreamFactory() + .getMetrics() + .get(MetricKey.GET_REQUEST_COUNT), + 0); + } + } + + @Test + void testEoFRanges() throws IOException { + try (S3AALClientStreamReader s3AALClientStreamReader = + this.createS3AALClientStreamReader( + S3ClientKind.SDK_V2_JAVA_ASYNC, AALInputStreamConfigurationKind.READ_CORRECTNESS)) { + + IntFunction allocate = ByteBuffer::allocate; + + S3SeekableInputStream s3SeekableInputStream = + s3AALClientStreamReader.createReadStream( + S3Object.RANDOM_1GB, OpenStreamInformation.DEFAULT); + + List objectRanges = new ArrayList<>(); + + objectRanges.add( + new ObjectRange(new CompletableFuture<>(), SizeConstants.ONE_GB_IN_BYTES + 1, 500)); + + assertThrows( + EOFException.class, + () -> + s3SeekableInputStream.readVectored(objectRanges, allocate, LOG_BYTE_BUFFER_RELEASED)); + + assertEquals( + s3AALClientStreamReader + .getS3SeekableInputStreamFactory() + .getMetrics() + .get(MetricKey.GET_REQUEST_COUNT), + 0); + } + } + + @Test + void testNullRange() throws IOException { + try (S3AALClientStreamReader s3AALClientStreamReader = + this.createS3AALClientStreamReader( + S3ClientKind.SDK_V2_JAVA_ASYNC, AALInputStreamConfigurationKind.READ_CORRECTNESS)) { + + IntFunction allocate = ByteBuffer::allocate; + + S3SeekableInputStream s3SeekableInputStream = + s3AALClientStreamReader.createReadStream( + S3Object.RANDOM_1GB, OpenStreamInformation.DEFAULT); + + List objectRanges = new ArrayList<>(); + objectRanges.add(new ObjectRange(new CompletableFuture<>(), 700, 500)); + objectRanges.add(null); + + assertThrows( + NullPointerException.class, + () -> + s3SeekableInputStream.readVectored(objectRanges, allocate, LOG_BYTE_BUFFER_RELEASED)); + + assertEquals( + s3AALClientStreamReader + .getS3SeekableInputStreamFactory() + .getMetrics() + .get(MetricKey.GET_REQUEST_COUNT), + 0); + } + } + + @Test + void testOverlappingRanges() throws IOException { + try (S3AALClientStreamReader s3AALClientStreamReader = + this.createS3AALClientStreamReader( + S3ClientKind.SDK_V2_JAVA_ASYNC, AALInputStreamConfigurationKind.READ_CORRECTNESS)) { + + IntFunction allocate = ByteBuffer::allocate; + + S3SeekableInputStream s3SeekableInputStream = + s3AALClientStreamReader.createReadStream( + S3Object.RANDOM_1GB, OpenStreamInformation.DEFAULT); + + List objectRanges = new ArrayList<>(); + objectRanges.add(new ObjectRange(new CompletableFuture<>(), 700, 500)); + objectRanges.add(new ObjectRange(new CompletableFuture<>(), 4000, 500)); + // overlaps with the first range + objectRanges.add(new ObjectRange(new CompletableFuture<>(), 900, 500)); + + assertThrows( + IllegalArgumentException.class, + () -> + s3SeekableInputStream.readVectored(objectRanges, allocate, LOG_BYTE_BUFFER_RELEASED)); + + assertEquals( + s3AALClientStreamReader + .getS3SeekableInputStreamFactory() + .getMetrics() + .get(MetricKey.GET_REQUEST_COUNT), + 0); + } + } + + @Test + void testSomeRangesFail() throws IOException { + try (S3AALClientStreamReader s3AALClientStreamReader = + this.createS3AALClientStreamReader( + S3ClientKind.FAULTY_S3_CLIENT, AALInputStreamConfigurationKind.NO_RETRY)) { + + IntFunction allocate = ByteBuffer::allocate; + + S3SeekableInputStream s3SeekableInputStream = + s3AALClientStreamReader.createReadStream( + S3Object.RANDOM_1GB, OpenStreamInformation.DEFAULT); + + List objectRanges = new ArrayList<>(); + objectRanges.add(new ObjectRange(new CompletableFuture<>(), 700, 500)); + objectRanges.add(new ObjectRange(new CompletableFuture<>(), 100 * ONE_MB, 500)); + objectRanges.add(new ObjectRange(new CompletableFuture<>(), 500 * ONE_MB, 500)); + + s3SeekableInputStream.readVectored(objectRanges, allocate, LOG_BYTE_BUFFER_RELEASED); + + assertThrows(CompletionException.class, () -> objectRanges.get(0).getByteBuffer().join()); + assertDoesNotThrow(() -> objectRanges.get(1).getByteBuffer().join()); + assertDoesNotThrow(() -> objectRanges.get(2).getByteBuffer().join()); + + assertEquals( + s3AALClientStreamReader + .getS3SeekableInputStreamFactory() + .getMetrics() + .get(MetricKey.GET_REQUEST_COUNT), + 3); + } + } + + @Test + void testTwoConcurrentStreams() throws IOException, ExecutionException, InterruptedException { + try (S3AALClientStreamReader s3AALClientStreamReader = + this.createS3AALClientStreamReader( + S3ClientKind.SDK_V2_JAVA_ASYNC, AALInputStreamConfigurationKind.READ_CORRECTNESS)) { + + ExecutorService threadPool = Executors.newFixedThreadPool(5); + + // Do three readVectored() concurrently + Future x = threadPool.submit(() -> performReadVectored(s3AALClientStreamReader)); + Future y = threadPool.submit(() -> performReadVectored(s3AALClientStreamReader)); + Future z = threadPool.submit(() -> performReadVectored(s3AALClientStreamReader)); + + x.get(); + y.get(); + z.get(); + + assertEquals( + s3AALClientStreamReader + .getS3SeekableInputStreamFactory() + .getMetrics() + .get(MetricKey.GET_REQUEST_COUNT), + 3); + } + } + + private void performReadVectored(S3AALClientStreamReader s3AALClientStreamReader) { + + try { + S3SeekableInputStream s3SeekableInputStream = + s3AALClientStreamReader.createReadStream( + S3Object.RANDOM_1GB, OpenStreamInformation.DEFAULT); + + List objectRanges = new ArrayList<>(); + objectRanges.add(new ObjectRange(new CompletableFuture<>(), 700, 500)); + objectRanges.add(new ObjectRange(new CompletableFuture<>(), 100 * ONE_MB, 500)); + objectRanges.add(new ObjectRange(new CompletableFuture<>(), 500 * ONE_MB, 500)); + + s3SeekableInputStream.readVectored( + objectRanges, ByteBuffer::allocate, LOG_BYTE_BUFFER_RELEASED); + + for (ObjectRange objectRange : objectRanges) { + objectRange.getByteBuffer().join(); + } + } catch (IOException e) { + // Do nothing + } + } + static Stream vectoredReads() { List readVectoredObjects = new ArrayList<>(); readVectoredObjects.add(S3Object.RANDOM_1GB); - readVectoredObjects.add(S3Object.CSV_20MB); + + List s3ClientKinds = new ArrayList<>(); + s3ClientKinds.add(S3ClientKind.SDK_V2_JAVA_ASYNC); return argumentsFor( - getS3ClientKinds(), + s3ClientKinds, readVectoredObjects, sequentialPatterns(), getS3SeekableInputStreamConfigurations()); } + + /** + * This test verifies that the data in the buffers is the same when a file is read through + * readVectored() vs stream.read(buf[], off, len). + * + * @param s3ClientKind S3 client kind to use + * @param s3Object S3 object to read + * @param streamReadPatternKind stream read pattern to apply + * @param AALInputStreamConfigurationKind configuration kind + * @param allocate method to allocate the buffer, can be direct or non-direct + * @throws IOException on any IOException + */ + protected void testReadVectored( + @NonNull S3ClientKind s3ClientKind, + @NonNull S3Object s3Object, + @NonNull StreamReadPatternKind streamReadPatternKind, + @NonNull AALInputStreamConfigurationKind AALInputStreamConfigurationKind, + @NonNull IntFunction allocate) + throws IOException { + + try (S3AALClientStreamReader s3AALClientStreamReader = + this.createS3AALClientStreamReader(s3ClientKind, AALInputStreamConfigurationKind)) { + + S3SeekableInputStream s3SeekableInputStream = + s3AALClientStreamReader.createReadStream(s3Object, OpenStreamInformation.DEFAULT); + + List objectRanges = new ArrayList<>(); + objectRanges.add(new ObjectRange(new CompletableFuture<>(), 50, ONE_MB)); + objectRanges.add(new ObjectRange(new CompletableFuture<>(), 2 * ONE_MB, 800)); + + // a range that should be within a single block + objectRanges.add(new ObjectRange(new CompletableFuture<>(), 200 * ONE_MB, 8 * ONE_MB)); + + // a range that spans multiple ranges + objectRanges.add(new ObjectRange(new CompletableFuture<>(), 260 * ONE_MB, 24 * ONE_MB)); + + s3SeekableInputStream.readVectored( + objectRanges, + allocate, + (buffer) -> { + LOG.debug("Release buffer of length {}: {}", buffer.limit(), buffer); + }); + + // Join on the buffers to ensure the vectored reads happen as they happen in an async thread + // pool. + for (ObjectRange objectRange : objectRanges) { + objectRange.getByteBuffer().join(); + } + + // Range [50MB - 51MB, 2MB - 2.8MB] will make 2 GET requests + // Range [200MB - 208MB] will make a single GET as it is an 8MB block. + // Range [260MB - 284MB] will make 3 GET requests + assertEquals( + s3AALClientStreamReader + .getS3SeekableInputStreamFactory() + .getMetrics() + .get(MetricKey.GET_REQUEST_COUNT), + 6); + + verifyStreamContents(objectRanges, s3AALClientStreamReader, s3Object); + } + } + + protected void testReadVectoredInSingleBlock( + @NonNull S3ClientKind s3ClientKind, + @NonNull S3Object s3Object, + @NonNull StreamReadPatternKind streamReadPatternKind, + @NonNull AALInputStreamConfigurationKind AALInputStreamConfigurationKind, + @NonNull IntFunction allocate) + throws IOException { + + try (S3AALClientStreamReader s3AALClientStreamReader = + this.createS3AALClientStreamReader(s3ClientKind, AALInputStreamConfigurationKind)) { + + S3SeekableInputStream s3SeekableInputStream = + s3AALClientStreamReader.createReadStream(s3Object, OpenStreamInformation.DEFAULT); + + List objectRanges = new ArrayList<>(); + objectRanges.add(new ObjectRange(new CompletableFuture<>(), 500, 800)); + objectRanges.add(new ObjectRange(new CompletableFuture<>(), 2000, 200)); + + s3SeekableInputStream.readVectored(objectRanges, allocate, LOG_BYTE_BUFFER_RELEASED); + + // Join on the buffers to ensure the vectored reads happen as they happen in an async thread + // pool. + for (ObjectRange objectRange : objectRanges) { + objectRange.getByteBuffer().join(); + } + + assertEquals( + 1, + s3AALClientStreamReader + .getS3SeekableInputStreamFactory() + .getMetrics() + .get(MetricKey.GET_REQUEST_COUNT)); + } + } + + protected void testReadVectoredForSequentialRanges( + @NonNull S3ClientKind s3ClientKind, + @NonNull S3Object s3Object, + @NonNull StreamReadPatternKind streamReadPatternKind, + @NonNull AALInputStreamConfigurationKind AALInputStreamConfigurationKind, + @NonNull IntFunction allocate) + throws IOException { + + try (S3AALClientStreamReader s3AALClientStreamReader = + this.createS3AALClientStreamReader(s3ClientKind, AALInputStreamConfigurationKind)) { + + S3SeekableInputStream s3SeekableInputStream = + s3AALClientStreamReader.createReadStream(s3Object, OpenStreamInformation.DEFAULT); + + List objectRanges = new ArrayList<>(); + objectRanges.add(new ObjectRange(new CompletableFuture<>(), 2 * ONE_MB, 8 * ONE_MB)); + objectRanges.add(new ObjectRange(new CompletableFuture<>(), 10 * ONE_MB, ONE_MB)); + objectRanges.add(new ObjectRange(new CompletableFuture<>(), 12 * ONE_MB, 5 * ONE_MB)); + objectRanges.add(new ObjectRange(new CompletableFuture<>(), 17 * ONE_MB, 4 * ONE_MB)); + objectRanges.add(new ObjectRange(new CompletableFuture<>(), 21 * ONE_MB, 8 * ONE_MB)); + + s3SeekableInputStream.readVectored(objectRanges, allocate, LOG_BYTE_BUFFER_RELEASED); + + // Join on the buffers to ensure the vectored reads happen as they happen in an async thread + // pool. + for (ObjectRange objectRange : objectRanges) { + objectRange.getByteBuffer().join(); + } + + assertEquals( + 5, + s3AALClientStreamReader + .getS3SeekableInputStreamFactory() + .getMetrics() + .get(MetricKey.GET_REQUEST_COUNT)); + } + } + + private void verifyStreamContents( + List objectRanges, + S3AALClientStreamReader s3AALClientStreamReader, + S3Object s3Object) + throws IOException { + for (ObjectRange objectRange : objectRanges) { + ByteBuffer byteBuffer = objectRange.getByteBuffer().join(); + + S3SeekableInputStream verificationStream = + s3AALClientStreamReader.createReadStream(s3Object, OpenStreamInformation.DEFAULT); + verificationStream.seek(objectRange.getOffset()); + byte[] buffer = new byte[objectRange.getLength()]; + int readBytes = verificationStream.read(buffer, 0, buffer.length); + + assertEquals(readBytes, buffer.length); + verifyBufferContentsEqual(byteBuffer, buffer); + } + } + + /** + * Verify the contents of two buffers are equal + * + * @param buffer ByteBuffer to verify contents for + * @param expected expected contents in byte buffer + */ + private void verifyBufferContentsEqual(ByteBuffer buffer, byte[] expected) { + for (int i = 0; i < expected.length; i++) { + assertEquals(buffer.get(i), expected[i]); + } + } } diff --git a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamFactory.java b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamFactory.java index 6b31cd9c..2759680d 100644 --- a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamFactory.java +++ b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamFactory.java @@ -57,7 +57,7 @@ public class S3SeekableInputStreamFactory implements AutoCloseable { private final BlobStore objectBlobStore; private final Telemetry telemetry; private final ObjectFormatSelector objectFormatSelector; - private final Metrics metrics; + @Getter private final Metrics metrics; private final ExecutorService threadPool; private static final Logger LOG = LoggerFactory.getLogger(S3SeekableInputStreamFactory.class); diff --git a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/logical/impl/SequentialPrefetcher.java b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/logical/impl/SequentialPrefetcher.java index 2be44636..f58d3c9e 100644 --- a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/logical/impl/SequentialPrefetcher.java +++ b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/logical/impl/SequentialPrefetcher.java @@ -24,6 +24,7 @@ import software.amazon.s3.analyticsaccelerator.io.physical.PhysicalIO; import software.amazon.s3.analyticsaccelerator.io.physical.plan.IOPlan; import software.amazon.s3.analyticsaccelerator.request.Range; +import software.amazon.s3.analyticsaccelerator.request.ReadMode; import software.amazon.s3.analyticsaccelerator.util.S3URI; import software.amazon.s3.analyticsaccelerator.util.StreamAttributes; @@ -85,7 +86,7 @@ public void prefetch(long position) { .build(), () -> { IOPlan prefetchPlan = new IOPlan(new Range(position, endPosition - 1)); - return physicalIO.execute(prefetchPlan); + return physicalIO.execute(prefetchPlan, ReadMode.SEQUENTIAL_FILE_PREFETCH); }); } catch (Exception e) { // Log the exception at debug level and swallow it diff --git a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/logical/parquet/ParquetPredictivePrefetchingTask.java b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/logical/parquet/ParquetPredictivePrefetchingTask.java index a9ef8311..9a88013c 100644 --- a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/logical/parquet/ParquetPredictivePrefetchingTask.java +++ b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/logical/parquet/ParquetPredictivePrefetchingTask.java @@ -34,6 +34,7 @@ import software.amazon.s3.analyticsaccelerator.io.physical.plan.IOPlanExecution; import software.amazon.s3.analyticsaccelerator.io.physical.plan.IOPlanState; import software.amazon.s3.analyticsaccelerator.request.Range; +import software.amazon.s3.analyticsaccelerator.request.ReadMode; import software.amazon.s3.analyticsaccelerator.util.PrefetchMode; import software.amazon.s3.analyticsaccelerator.util.S3URI; import software.amazon.s3.analyticsaccelerator.util.StreamAttributes; @@ -255,13 +256,13 @@ public IOPlanExecution prefetchRecentColumns( IOPlan dictionaryIoPlan = (dictionaryRanges.isEmpty()) ? IOPlan.EMPTY_PLAN : new IOPlan(dictionaryRanges); - physicalIO.execute(dictionaryIoPlan); + physicalIO.execute(dictionaryIoPlan, ReadMode.DICTIONARY_PREFETCH); IOPlan columnIoPlan = (columnRanges.isEmpty()) ? IOPlan.EMPTY_PLAN : new IOPlan(ParquetUtils.mergeRanges(columnRanges)); - return physicalIO.execute(columnIoPlan); + return physicalIO.execute(columnIoPlan, ReadMode.COLUMN_PREFETCH); } catch (Throwable t) { LOG.debug("Unable to prefetch columns for {}.", this.s3Uri.getKey(), t); return IOPlanExecution.builder().state(IOPlanState.SKIPPED).build(); diff --git a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/logical/parquet/ParquetPrefetchRemainingColumnTask.java b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/logical/parquet/ParquetPrefetchRemainingColumnTask.java index 16b00abf..b65fa3d7 100644 --- a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/logical/parquet/ParquetPrefetchRemainingColumnTask.java +++ b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/logical/parquet/ParquetPrefetchRemainingColumnTask.java @@ -27,6 +27,7 @@ import software.amazon.s3.analyticsaccelerator.io.physical.plan.IOPlanExecution; import software.amazon.s3.analyticsaccelerator.io.physical.plan.IOPlanState; import software.amazon.s3.analyticsaccelerator.request.Range; +import software.amazon.s3.analyticsaccelerator.request.ReadMode; import software.amazon.s3.analyticsaccelerator.util.S3URI; import software.amazon.s3.analyticsaccelerator.util.StreamAttributes; @@ -105,7 +106,7 @@ private IOPlanExecution executeRemainingColumnPrefetchPlan( long startRange = position + len; long endRange = startRange + (columnMetadata.getCompressedSize() - len); IOPlan ioPlan = new IOPlan(new Range(startRange, endRange)); - return physicalIO.execute(ioPlan); + return physicalIO.execute(ioPlan, ReadMode.REMAINING_COLUMN_PREFETCH); } return IOPlanExecution.builder().state(IOPlanState.SKIPPED).build(); diff --git a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/logical/parquet/ParquetPrefetchTailTask.java b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/logical/parquet/ParquetPrefetchTailTask.java index e416e555..aaef67c5 100644 --- a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/logical/parquet/ParquetPrefetchTailTask.java +++ b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/logical/parquet/ParquetPrefetchTailTask.java @@ -26,6 +26,7 @@ import software.amazon.s3.analyticsaccelerator.io.physical.PhysicalIO; import software.amazon.s3.analyticsaccelerator.io.physical.plan.IOPlan; import software.amazon.s3.analyticsaccelerator.request.Range; +import software.amazon.s3.analyticsaccelerator.request.ReadMode; import software.amazon.s3.analyticsaccelerator.util.S3URI; import software.amazon.s3.analyticsaccelerator.util.StreamAttributes; @@ -76,7 +77,7 @@ public List prefetchTail() { ParquetUtils.getFileTailPrefetchRanges(logicalIOConfiguration, 0, contentLength); IOPlan ioPlan = new IOPlan(ranges); // Create a non-empty IOPlan only if we have a valid range to work with - physicalIO.execute(ioPlan); + physicalIO.execute(ioPlan, ReadMode.PREFETCH_TAIL); return ioPlan.getPrefetchRanges(); } catch (Exception e) { LOG.debug( diff --git a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/PhysicalIO.java b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/PhysicalIO.java index c4c64a6e..9075c7a2 100644 --- a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/PhysicalIO.java +++ b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/PhysicalIO.java @@ -23,6 +23,7 @@ import software.amazon.s3.analyticsaccelerator.common.ObjectRange; import software.amazon.s3.analyticsaccelerator.io.physical.plan.IOPlan; import software.amazon.s3.analyticsaccelerator.io.physical.plan.IOPlanExecution; +import software.amazon.s3.analyticsaccelerator.request.ReadMode; /** An interface defining how a logical IO layer gets hooked into Physical IO. */ public interface PhysicalIO extends RandomAccessReadable { @@ -31,9 +32,10 @@ public interface PhysicalIO extends RandomAccessReadable { * Async method capable of executing a logical IO plan. * * @param ioPlan the plan to execute asynchronously + * @param readMode the read mode for which this IoPlan is being executed * @return an IOPlanExecution object tracking the execution of the submitted plan */ - IOPlanExecution execute(IOPlan ioPlan) throws IOException; + IOPlanExecution execute(IOPlan ioPlan, ReadMode readMode) throws IOException; /** * Fetches the list of provided ranges in parallel. Byte buffers are created using the allocate diff --git a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/Blob.java b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/Blob.java index 8fe82116..fa1a6387 100644 --- a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/Blob.java +++ b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/Blob.java @@ -155,9 +155,10 @@ public int read(byte[] buf, int off, int len, long pos) throws IOException { * Execute an IOPlan. * * @param plan the IOPlan to execute + * @param readMode the readMode for which this IoPlan is being executed * @return the status of execution */ - public IOPlanExecution execute(IOPlan plan) { + public IOPlanExecution execute(IOPlan plan, ReadMode readMode) throws IOException { return telemetry.measureStandard( () -> Operation.builder() @@ -169,8 +170,7 @@ public IOPlanExecution execute(IOPlan plan) { () -> { try { for (Range range : plan.getPrefetchRanges()) { - this.blockManager.makeRangeAvailable( - range.getStart(), range.getLength(), ReadMode.ASYNC); + this.blockManager.makeRangeAvailable(range.getStart(), range.getLength(), readMode); } return IOPlanExecution.builder().state(IOPlanState.SUBMITTED).build(); diff --git a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/Block.java b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/Block.java index aa02c776..7c3ca429 100644 --- a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/Block.java +++ b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/Block.java @@ -137,7 +137,10 @@ private void generateSourceAndData() throws IOException { .attribute(StreamAttributes.range(this.blockKey.getRange())) .attribute(StreamAttributes.generation(generation)) .build(), - objectClient.getObject(getRequest, openStreamInformation)); + () -> { + this.aggregatingMetrics.add(MetricKey.GET_REQUEST_COUNT, 1); + return objectClient.getObject(getRequest, openStreamInformation); + }); // Handle IOExceptions when converting stream to byte array this.data = diff --git a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlockManager.java b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlockManager.java index 8b2d4bc2..0bd47199 100644 --- a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlockManager.java +++ b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlockManager.java @@ -178,12 +178,8 @@ public synchronized void makeRangeAvailable(long pos, long len, ReadMode readMod // effectiveEnd of the requested range long effectiveEnd = pos + Math.max(len, configuration.getReadAheadBytes()) - 1; - // Check sequential prefetching. If read mode is ASYNC, that is the request is from the parquet - // prefetch path, then do not extend the request. - // TODO: Improve readModes, as tracked in - // https://github.com/awslabs/analytics-accelerator-s3/issues/195 final long generation; - if (readMode != ReadMode.ASYNC && patternDetector.isSequentialRead(pos)) { + if (readMode.allowRequestExtension() && patternDetector.isSequentialRead(pos)) { generation = patternDetector.getGeneration(pos); effectiveEnd = Math.max( diff --git a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/impl/PhysicalIOImpl.java b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/impl/PhysicalIOImpl.java index 9639f377..c6cca623 100644 --- a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/impl/PhysicalIOImpl.java +++ b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/impl/PhysicalIOImpl.java @@ -18,6 +18,7 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.function.IntFunction; @@ -35,6 +36,8 @@ import software.amazon.s3.analyticsaccelerator.io.physical.plan.IOPlan; import software.amazon.s3.analyticsaccelerator.io.physical.plan.IOPlanExecution; import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata; +import software.amazon.s3.analyticsaccelerator.request.Range; +import software.amazon.s3.analyticsaccelerator.request.ReadMode; import software.amazon.s3.analyticsaccelerator.util.ObjectKey; import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation; import software.amazon.s3.analyticsaccelerator.util.S3URI; @@ -212,7 +215,7 @@ public int readTail(byte[] buf, int off, int len) throws IOException { * @return an IOPlanExecution object tracking the execution of the submitted plan */ @Override - public IOPlanExecution execute(IOPlan ioPlan) { + public IOPlanExecution execute(IOPlan ioPlan, ReadMode readMode) { return telemetry.measureVerbose( () -> Operation.builder() @@ -224,7 +227,10 @@ public IOPlanExecution execute(IOPlan ioPlan) { StreamAttributes.physicalIORelativeTimestamp( System.nanoTime() - physicalIOBirth)) .build(), - () -> blobStore.get(objectKey, this.metadata, openStreamInformation).execute(ioPlan)); + () -> + blobStore + .get(objectKey, this.metadata, openStreamInformation) + .execute(ioPlan, readMode)); } @SuppressFBWarnings( @@ -236,6 +242,8 @@ public void readVectored(List objectRanges, IntFunction throws IOException { Blob blob = blobStore.get(objectKey, this.metadata, openStreamInformation); + makeReadVectoredRangesAvailable(objectRanges); + for (ObjectRange objectRange : objectRanges) { ByteBuffer buffer = allocate.apply(objectRange.getLength()); threadPool.submit( @@ -253,10 +261,10 @@ public void readVectored(List objectRanges, IntFunction readIntoDirectBuffer(buffer, blob, objectRange); buffer.flip(); } else { + // there is no use of a temp byte buffer, or buffer.put() calls, + // so flip() is not needed. blob.read(buffer.array(), 0, objectRange.getLength(), objectRange.getOffset()); } - // there is no use of a temp byte buffer, or buffer.put() calls, - // so flip() is not needed. objectRange.getByteBuffer().complete(buffer); } catch (Exception e) { objectRange.getByteBuffer().completeExceptionally(e); @@ -289,6 +297,23 @@ private void readIntoDirectBuffer(ByteBuffer buffer, Blob blob, ObjectRange rang } } + /** + * Does the block creation for the read vectored ranges. + * + * @param objectRanges Vectored ranges to fetch + */ + private void makeReadVectoredRangesAvailable(List objectRanges) { + List ranges = new ArrayList<>(); + + for (ObjectRange objectRange : objectRanges) { + ranges.add( + new Range( + objectRange.getOffset(), objectRange.getOffset() + objectRange.getLength() - 1)); + } + + execute(new IOPlan(ranges), ReadMode.READ_VECTORED); + } + private void handleOperationExceptions(Exception e) { if (e.getCause() != null && e.getCause().getMessage() != null diff --git a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamTest.java b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamTest.java index e2040235..91d1fbd0 100644 --- a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamTest.java +++ b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamTest.java @@ -198,6 +198,19 @@ void testReadWithBufferNulls() throws IOException { } } + @Test + void testNullRangeList() throws IOException { + try (S3SeekableInputStream stream = getTestStream()) { + assertThrows( + NullPointerException.class, + () -> stream.readVectored(null, ByteBuffer::allocate, (buffer) -> {})); + + assertThrows( + NullPointerException.class, + () -> stream.readVectored(new ArrayList<>(), null, (buffer) -> {})); + } + } + @Test void testReadWithBuffer() throws IOException { try (S3SeekableInputStream stream = getTestStream()) { diff --git a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/logical/impl/ParquetPrefetcherTest.java b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/logical/impl/ParquetPrefetcherTest.java index d35cf5f4..8257261a 100644 --- a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/logical/impl/ParquetPrefetcherTest.java +++ b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/logical/impl/ParquetPrefetcherTest.java @@ -45,6 +45,7 @@ import software.amazon.s3.analyticsaccelerator.io.physical.plan.IOPlan; import software.amazon.s3.analyticsaccelerator.io.physical.plan.IOPlanExecution; import software.amazon.s3.analyticsaccelerator.io.physical.plan.IOPlanState; +import software.amazon.s3.analyticsaccelerator.request.ReadMode; import software.amazon.s3.analyticsaccelerator.util.PrefetchMode; import software.amazon.s3.analyticsaccelerator.util.S3URI; @@ -520,7 +521,8 @@ public void prefetchFooterAndBuildMetadataParseMetadataExceptionCaught() throws .thenReturn(new FileTail(ByteBuffer.wrap(new byte[5]), 5)); when(parquetMetadataParsingTask.storeColumnMappers(any(FileTail.class))) .thenThrow(new CompletionException("Error", new IOException())); - when(physicalIO.execute(any(IOPlan.class))).thenReturn(skippedIoPlanExecution); + when(physicalIO.execute(any(IOPlan.class), any(ReadMode.class))) + .thenReturn(skippedIoPlanExecution); assertEquals(parquetPrefetcher.prefetchFooterAndBuildMetadata().join(), skippedIoPlanExecution); } diff --git a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/logical/impl/SequentialPrefetcherTest.java b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/logical/impl/SequentialPrefetcherTest.java index e4fb563d..62e72a12 100644 --- a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/logical/impl/SequentialPrefetcherTest.java +++ b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/logical/impl/SequentialPrefetcherTest.java @@ -33,6 +33,7 @@ import software.amazon.s3.analyticsaccelerator.io.physical.plan.IOPlanExecution; import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata; import software.amazon.s3.analyticsaccelerator.request.Range; +import software.amazon.s3.analyticsaccelerator.request.ReadMode; import software.amazon.s3.analyticsaccelerator.util.S3URI; @SuppressFBWarnings( @@ -77,7 +78,8 @@ void testPrefetchFunctionality() throws IOException { ObjectMetadata metadata = mock(ObjectMetadata.class); when(metadata.getContentLength()).thenReturn(10000L); when(physicalIO.metadata()).thenReturn(metadata); - when(physicalIO.execute(any(IOPlan.class))).thenReturn(mock(IOPlanExecution.class)); + when(physicalIO.execute(any(IOPlan.class), any(ReadMode.class))) + .thenReturn(mock(IOPlanExecution.class)); SequentialPrefetcher prefetcher = new SequentialPrefetcher(TEST_URI, physicalIO, TestTelemetry.DEFAULT, config); @@ -85,7 +87,8 @@ void testPrefetchFunctionality() throws IOException { prefetcher.prefetch(0); ArgumentCaptor ioPlanCaptor = ArgumentCaptor.forClass(IOPlan.class); - verify(physicalIO).execute(ioPlanCaptor.capture()); + ArgumentCaptor readModeCaptor = ArgumentCaptor.forClass(ReadMode.class); + verify(physicalIO).execute(ioPlanCaptor.capture(), readModeCaptor.capture()); IOPlan capturedPlan = ioPlanCaptor.getValue(); List ranges = capturedPlan.getPrefetchRanges(); @@ -94,6 +97,7 @@ void testPrefetchFunctionality() throws IOException { Range range = ranges.get(0); assertEquals(0, range.getStart()); assertEquals(4095, range.getEnd()); + assertEquals(readModeCaptor.getValue(), ReadMode.SEQUENTIAL_FILE_PREFETCH); } @Test @@ -104,7 +108,8 @@ void testPrefetchNearEndOfFile() throws IOException { ObjectMetadata metadata = mock(ObjectMetadata.class); when(metadata.getContentLength()).thenReturn(3000L); when(physicalIO.metadata()).thenReturn(metadata); - when(physicalIO.execute(any(IOPlan.class))).thenReturn(mock(IOPlanExecution.class)); + when(physicalIO.execute(any(IOPlan.class), any(ReadMode.class))) + .thenReturn(mock(IOPlanExecution.class)); SequentialPrefetcher prefetcher = new SequentialPrefetcher(TEST_URI, physicalIO, TestTelemetry.DEFAULT, config); @@ -112,7 +117,8 @@ void testPrefetchNearEndOfFile() throws IOException { prefetcher.prefetch(2000); ArgumentCaptor ioPlanCaptor = ArgumentCaptor.forClass(IOPlan.class); - verify(physicalIO).execute(ioPlanCaptor.capture()); + ArgumentCaptor readModeCaptor = ArgumentCaptor.forClass(ReadMode.class); + verify(physicalIO).execute(ioPlanCaptor.capture(), readModeCaptor.capture()); IOPlan capturedPlan = ioPlanCaptor.getValue(); List ranges = capturedPlan.getPrefetchRanges(); @@ -121,6 +127,7 @@ void testPrefetchNearEndOfFile() throws IOException { Range range = ranges.get(0); assertEquals(2000, range.getStart()); assertEquals(2999, range.getEnd()); + assertEquals(readModeCaptor.getValue(), ReadMode.SEQUENTIAL_FILE_PREFETCH); } @Test @@ -130,7 +137,7 @@ void testPrefetchWithIOException() throws IOException { ObjectMetadata metadata = mock(ObjectMetadata.class); when(metadata.getContentLength()).thenReturn(10000L); when(physicalIO.metadata()).thenReturn(metadata); - when(physicalIO.execute(any(IOPlan.class))) + when(physicalIO.execute(any(IOPlan.class), any(ReadMode.class))) .thenThrow(new IOException("Simulated IO exception")); SequentialPrefetcher prefetcher = @@ -140,6 +147,6 @@ void testPrefetchWithIOException() throws IOException { prefetcher.prefetch(0); // Verify that execute was called despite the exception - verify(physicalIO).execute(any(IOPlan.class)); + verify(physicalIO).execute(any(IOPlan.class), any(ReadMode.class)); } } diff --git a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/logical/parquet/ParquetPredictivePrefetchingTaskTest.java b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/logical/parquet/ParquetPredictivePrefetchingTaskTest.java index b96012a8..08dd5a05 100644 --- a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/logical/parquet/ParquetPredictivePrefetchingTaskTest.java +++ b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/logical/parquet/ParquetPredictivePrefetchingTaskTest.java @@ -45,6 +45,7 @@ import software.amazon.s3.analyticsaccelerator.io.physical.plan.IOPlanExecution; import software.amazon.s3.analyticsaccelerator.io.physical.plan.IOPlanState; import software.amazon.s3.analyticsaccelerator.request.Range; +import software.amazon.s3.analyticsaccelerator.request.ReadMode; import software.amazon.s3.analyticsaccelerator.util.PrefetchMode; import software.amazon.s3.analyticsaccelerator.util.S3URI; @@ -182,13 +183,15 @@ void testRowGroupPrefetch() throws IOException { // Then: physical IO gets the correct plan. Only recent columns from the current row // group are prefetched. ArgumentCaptor ioPlanArgumentCaptor = ArgumentCaptor.forClass(IOPlan.class); - verify(physicalIO, times(2)).execute(ioPlanArgumentCaptor.capture()); + ArgumentCaptor readModeCaptor = ArgumentCaptor.forClass(ReadMode.class); + verify(physicalIO, times(2)).execute(ioPlanArgumentCaptor.capture(), readModeCaptor.capture()); IOPlan ioPlan = ioPlanArgumentCaptor.getValue(); List expectedRanges = new ArrayList<>(); expectedRanges.add(new Range(100, 599)); assertTrue(ioPlan.getPrefetchRanges().containsAll(expectedRanges)); + assertEquals(readModeCaptor.getValue(), ReadMode.COLUMN_PREFETCH); } @Test @@ -235,13 +238,18 @@ void testRowGroupPrefetchForOnlyDictionary() throws IOException { // Then: physical IO gets the correct plan. Only recent columns from the current row // group are prefetched. ArgumentCaptor ioPlanArgumentCaptor = ArgumentCaptor.forClass(IOPlan.class); - verify(physicalIO, times(2)).execute(ioPlanArgumentCaptor.capture()); + ArgumentCaptor readModeCaptor = ArgumentCaptor.forClass(ReadMode.class); + verify(physicalIO, times(2)).execute(ioPlanArgumentCaptor.capture(), readModeCaptor.capture()); IOPlan ioPlan = ioPlanArgumentCaptor.getAllValues().get(0); List expectedRanges = new ArrayList<>(); + List readModes = readModeCaptor.getAllValues(); + expectedRanges.add(new Range(100, 199)); assertTrue(ioPlan.getPrefetchRanges().containsAll(expectedRanges)); + assertEquals(readModes.get(0), ReadMode.DICTIONARY_PREFETCH); + assertEquals(readModes.get(1), ReadMode.COLUMN_PREFETCH); } @Test @@ -414,7 +422,8 @@ void testPrefetchRecentColumns() throws IOException { // Then: physical IO gets the correct plan ArgumentCaptor ioPlanArgumentCaptor = ArgumentCaptor.forClass(IOPlan.class); - verify(physicalIO, times(2)).execute(ioPlanArgumentCaptor.capture()); + ArgumentCaptor readModeCaptor = ArgumentCaptor.forClass(ReadMode.class); + verify(physicalIO, times(2)).execute(ioPlanArgumentCaptor.capture(), readModeCaptor.capture()); IOPlan ioPlan = ioPlanArgumentCaptor.getValue(); List expectedRanges = new ArrayList<>(); @@ -423,6 +432,7 @@ void testPrefetchRecentColumns() throws IOException { expectedRanges.add(new Range(100, 1099)); expectedRanges.add(new Range(1300, 1799)); assertTrue(ioPlan.getPrefetchRanges().containsAll(expectedRanges)); + assertEquals(readModeCaptor.getValue(), ReadMode.COLUMN_PREFETCH); } @Test @@ -438,7 +448,9 @@ void testExceptionInPrefetchingIsSwallowed() throws IOException { new ParquetColumnPrefetchStore(LogicalIOConfiguration.DEFAULT)); // When: the underlying PhysicalIO always throws - doThrow(new IOException("Error in prefetch")).when(physicalIO).execute(any(IOPlan.class)); + doThrow(new IOException("Error in prefetch")) + .when(physicalIO) + .execute(any(IOPlan.class), any(ReadMode.class)); assertEquals( IOPlanExecution.builder().state(IOPlanState.SKIPPED).build(), diff --git a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/logical/parquet/ParquetPrefetchRemainingColumnTaskTest.java b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/logical/parquet/ParquetPrefetchRemainingColumnTaskTest.java index 42ed54b3..91326b4a 100644 --- a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/logical/parquet/ParquetPrefetchRemainingColumnTaskTest.java +++ b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/logical/parquet/ParquetPrefetchRemainingColumnTaskTest.java @@ -32,6 +32,7 @@ import java.util.HashMap; import java.util.List; import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; import software.amazon.s3.analyticsaccelerator.TestTelemetry; import software.amazon.s3.analyticsaccelerator.common.telemetry.Telemetry; import software.amazon.s3.analyticsaccelerator.io.logical.LogicalIOConfiguration; @@ -42,6 +43,7 @@ import software.amazon.s3.analyticsaccelerator.io.physical.plan.IOPlanExecution; import software.amazon.s3.analyticsaccelerator.io.physical.plan.IOPlanState; import software.amazon.s3.analyticsaccelerator.request.Range; +import software.amazon.s3.analyticsaccelerator.request.ReadMode; import software.amazon.s3.analyticsaccelerator.util.S3URI; @SuppressFBWarnings( @@ -121,8 +123,12 @@ void testRemainingColumnPrefetched() { TEST_URI, TestTelemetry.DEFAULT, mockedPhysicalIO, mockedParquetColumnPrefetchStore); parquetPrefetchRemainingColumnTask.prefetchRemainingColumnChunk(200, 5 * ONE_MB); - verify(mockedPhysicalIO).execute(any(IOPlan.class)); - verify(mockedPhysicalIO).execute(argThat(new IOPlanMatcher(expectedRanges))); + verify(mockedPhysicalIO).execute(any(IOPlan.class), any(ReadMode.class)); + ArgumentCaptor readModeCaptor = ArgumentCaptor.forClass(ReadMode.class); + + verify(mockedPhysicalIO) + .execute(argThat(new IOPlanMatcher(expectedRanges)), readModeCaptor.capture()); + assertEquals(readModeCaptor.getValue(), ReadMode.REMAINING_COLUMN_PREFETCH); } @Test @@ -143,7 +149,9 @@ void testExceptionInPrefetchingIsSwallowed() { new ParquetPrefetchRemainingColumnTask( TEST_URI, TestTelemetry.DEFAULT, mockedPhysicalIO, mockedParquetColumnPrefetchStore); - doThrow(new IOException("Error in prefetch")).when(mockedPhysicalIO).execute(any(IOPlan.class)); + doThrow(new IOException("Error in prefetch")) + .when(mockedPhysicalIO) + .execute(any(IOPlan.class), any(ReadMode.class)); assertEquals( IOPlanExecution.builder().state(IOPlanState.SKIPPED).build(), diff --git a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/logical/parquet/ParquetPrefetchTailTaskTest.java b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/logical/parquet/ParquetPrefetchTailTaskTest.java index 1622c991..d3833afd 100644 --- a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/logical/parquet/ParquetPrefetchTailTaskTest.java +++ b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/logical/parquet/ParquetPrefetchTailTaskTest.java @@ -15,8 +15,7 @@ */ package software.amazon.s3.analyticsaccelerator.io.logical.parquet; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.*; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Mockito.doThrow; @@ -34,6 +33,7 @@ import java.util.concurrent.CompletionException; import lombok.SneakyThrows; import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; import software.amazon.s3.analyticsaccelerator.common.telemetry.Telemetry; import software.amazon.s3.analyticsaccelerator.io.logical.LogicalIOConfiguration; import software.amazon.s3.analyticsaccelerator.io.physical.PhysicalIO; @@ -41,6 +41,7 @@ import software.amazon.s3.analyticsaccelerator.io.physical.plan.IOPlan; import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata; import software.amazon.s3.analyticsaccelerator.request.Range; +import software.amazon.s3.analyticsaccelerator.request.ReadMode; import software.amazon.s3.analyticsaccelerator.util.S3URI; @SuppressFBWarnings( @@ -101,9 +102,14 @@ void testTailPrefetch() throws IOException { TEST_URI, Telemetry.NOOP, LogicalIOConfiguration.DEFAULT, mockedPhysicalIO); parquetPrefetchTailTask.prefetchTail(); - verify(mockedPhysicalIO).execute(any(IOPlan.class)); + verify(mockedPhysicalIO).execute(any(IOPlan.class), any(ReadMode.class)); + ArgumentCaptor readModeCaptor = ArgumentCaptor.forClass(ReadMode.class); verify(mockedPhysicalIO) - .execute(argThat(new IOPlanMatcher(contentLengthToRangeList.getValue()))); + .execute( + argThat(new IOPlanMatcher(contentLengthToRangeList.getValue())), + readModeCaptor.capture()); + + assertEquals(readModeCaptor.getValue(), ReadMode.PREFETCH_TAIL); } } @@ -119,7 +125,9 @@ void testExceptionRemappedToCompletionException() { // When: task executes but PhysicalIO throws ObjectMetadata metadata = ObjectMetadata.builder().contentLength(600).etag("random").build(); when(mockedPhysicalIO.metadata()).thenReturn(metadata); - doThrow(new IOException("Error in prefetch")).when(mockedPhysicalIO).execute(any(IOPlan.class)); + doThrow(new IOException("Error in prefetch")) + .when(mockedPhysicalIO) + .execute(any(IOPlan.class), any(ReadMode.class)); // Then: exception is re-mapped to CompletionException assertThrows(CompletionException.class, () -> parquetPrefetchTailTask.prefetchTail()); diff --git a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlobTest.java b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlobTest.java index 19c842f8..b97df883 100644 --- a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlobTest.java +++ b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlobTest.java @@ -145,12 +145,12 @@ public void testExecuteSubmitsCorrectRanges() throws IOException { IOPlan ioPlan = new IOPlan(ranges); // When: the IOPlan is executed - IOPlanExecution execution = blob.execute(ioPlan); + IOPlanExecution execution = blob.execute(ioPlan, ReadMode.COLUMN_PREFETCH); // Then: correct ranges are submitted assertEquals(SUBMITTED, execution.getState()); - verify(blockManager).makeRangeAvailable(0, 101, ReadMode.ASYNC); - verify(blockManager).makeRangeAvailable(999, 2, ReadMode.ASYNC); + verify(blockManager).makeRangeAvailable(0, 101, ReadMode.COLUMN_PREFETCH); + verify(blockManager).makeRangeAvailable(999, 2, ReadMode.COLUMN_PREFETCH); } @Test @@ -254,7 +254,7 @@ public void testExecuteWithFailure() throws IOException { IOPlan ioPlan = new IOPlan(ranges); // When: executing plan that will fail - IOPlanExecution execution = blob.execute(ioPlan); + IOPlanExecution execution = blob.execute(ioPlan, ReadMode.COLUMN_PREFETCH); // Then: execution state is FAILED assertEquals(IOPlanState.FAILED, execution.getState()); diff --git a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlockManagerTest.java b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlockManagerTest.java index ebcf4f3f..da1ead42 100644 --- a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlockManagerTest.java +++ b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlockManagerTest.java @@ -30,6 +30,8 @@ import java.util.concurrent.*; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; import org.mockito.ArgumentCaptor; import software.amazon.awssdk.services.s3.model.S3Exception; import software.amazon.s3.analyticsaccelerator.TestTelemetry; @@ -493,22 +495,63 @@ void testClose() throws IOException, InterruptedException { closeLatch.await(5, TimeUnit.SECONDS), "Close operation should complete within timeout"); } + @ParameterizedTest + @MethodSource("readModes") + @DisplayName("Test makeRangeAvailable with async read modes") + void testMakeRangeAvailableAsync(ReadMode readMode) throws IOException { + // Given + ObjectClient objectClient = mock(ObjectClient.class); + BlockManager blockManager = getTestBlockManager(objectClient, 100 * ONE_MB); + + // When + blockManager.makeRangeAvailable(0, 5 * ONE_MB, readMode); + blockManager.makeRangeAvailable(5 * ONE_MB, 3 * ONE_MB, readMode); + blockManager.makeRangeAvailable(8 * ONE_MB, 5 * ONE_MB, readMode); + + // Then + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(GetRequest.class); + verify(objectClient, times(3)).getObject(requestCaptor.capture(), any()); + + List getRequestList = requestCaptor.getAllValues(); + + // Verify that prefetch modes don't trigger sequential prefetching + assertEquals(getRequestList.get(0).getRange().getLength(), 5 * ONE_MB); + assertEquals(getRequestList.get(1).getRange().getLength(), 3 * ONE_MB); + assertEquals(getRequestList.get(2).getRange().getLength(), 5 * ONE_MB); + } + @Test - @DisplayName("Test makeRangeAvailable with async read mode") - void testMakeRangeAvailableAsync() throws IOException { + @DisplayName("Test makeRangeAvailable with sync read mode") + void testMakeRangeAvailableSync() throws IOException { // Given ObjectClient objectClient = mock(ObjectClient.class); - BlockManager blockManager = getTestBlockManager(objectClient, 1024); + BlockManager blockManager = getTestBlockManager(objectClient, 100 * ONE_MB); // When - blockManager.makeRangeAvailable(0, 100, ReadMode.ASYNC); + blockManager.makeRangeAvailable(0, 5 * ONE_MB, ReadMode.SYNC); + blockManager.makeRangeAvailable(5 * ONE_MB, 3 * ONE_MB, ReadMode.SYNC); // Then ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(GetRequest.class); - verify(objectClient).getObject(requestCaptor.capture(), any()); + verify(objectClient, times(2)).getObject(requestCaptor.capture(), any()); + + List getRequestList = requestCaptor.getAllValues(); + + // Verify that with the SYNC mode, sequential prefetching kicks in + assertEquals(getRequestList.get(0).getRange().getLength(), 5 * ONE_MB); + // Second request gets extended by 4MB to 9MB. + assertEquals(getRequestList.get(1).getRange().getLength(), 4 * ONE_MB + 1); + } + + private static List readModes() { + List readModes = new ArrayList<>(); + readModes.add(ReadMode.READ_VECTORED); + readModes.add(ReadMode.COLUMN_PREFETCH); + readModes.add(ReadMode.DICTIONARY_PREFETCH); + readModes.add(ReadMode.PREFETCH_TAIL); + readModes.add(ReadMode.REMAINING_COLUMN_PREFETCH); - // Verify that async mode doesn't trigger read ahead - assertEquals(1024, requestCaptor.getValue().getRange().getLength()); + return readModes; } @Test diff --git a/input-stream/src/testFixtures/java/software/amazon/s3/analyticsaccelerator/access/AALInputStreamConfigurationKind.java b/input-stream/src/testFixtures/java/software/amazon/s3/analyticsaccelerator/access/AALInputStreamConfigurationKind.java index ccc413ca..c5754586 100644 --- a/input-stream/src/testFixtures/java/software/amazon/s3/analyticsaccelerator/access/AALInputStreamConfigurationKind.java +++ b/input-stream/src/testFixtures/java/software/amazon/s3/analyticsaccelerator/access/AALInputStreamConfigurationKind.java @@ -29,7 +29,8 @@ public enum AALInputStreamConfigurationKind { DEFAULT("DEFAULT", S3SeekableInputStreamConfiguration.DEFAULT), GRAY_FAILURE("GRAY_FAILURE", grayFailureConfiguration()), READ_CORRECTNESS("READ_CORRECTNESS", readCorrectnessConfiguration()), - CONCURRENCY_CORRECTNESS("CONCURRENCY_CORRECTNESS", concurrencyCorrectnessConfiguration()); + CONCURRENCY_CORRECTNESS("CONCURRENCY_CORRECTNESS", concurrencyCorrectnessConfiguration()), + NO_RETRY("NO_RETRY", noRetryConfiguration()); private final String name; private final S3SeekableInputStreamConfiguration value; @@ -47,6 +48,16 @@ private static S3SeekableInputStreamConfiguration grayFailureConfiguration() { return S3SeekableInputStreamConfiguration.fromConfiguration(config); } + private static S3SeekableInputStreamConfiguration noRetryConfiguration() { + String configurationPrefix = "noRetry"; + Map customConfiguration = new HashMap<>(); + customConfiguration.put(configurationPrefix + ".physicalio.blockreadtimeout", "2000"); + customConfiguration.put(configurationPrefix + ".physicalio.blockreadretrycount", "1"); + ConnectorConfiguration config = + new ConnectorConfiguration(customConfiguration, configurationPrefix); + return S3SeekableInputStreamConfiguration.fromConfiguration(config); + } + private static S3SeekableInputStreamConfiguration readCorrectnessConfiguration() { String configurationPrefix = "readCorrectness"; Map customConfiguration = new HashMap<>();