diff --git a/common/src/main/java/software/amazon/s3/analyticsaccelerator/common/ObjectRange.java b/common/src/main/java/software/amazon/s3/analyticsaccelerator/common/ObjectRange.java index d1f2d18d3..bf6f667ed 100644 --- a/common/src/main/java/software/amazon/s3/analyticsaccelerator/common/ObjectRange.java +++ b/common/src/main/java/software/amazon/s3/analyticsaccelerator/common/ObjectRange.java @@ -46,4 +46,10 @@ public ObjectRange(CompletableFuture byteBuffer, long offset, int le this.offset = offset; this.length = length; } + + @Override + public String toString() { + return String.format( + "offset: %d, length: %d, completable-future: %s", offset, length, byteBuffer.toString()); + } } diff --git a/common/src/main/java/software/amazon/s3/analyticsaccelerator/request/Range.java b/common/src/main/java/software/amazon/s3/analyticsaccelerator/request/Range.java index c0b8f137d..a56f5275e 100644 --- a/common/src/main/java/software/amazon/s3/analyticsaccelerator/request/Range.java +++ b/common/src/main/java/software/amazon/s3/analyticsaccelerator/request/Range.java @@ -25,7 +25,7 @@ * like "bytes=0-555" -- this is SDK detail we should not care about in layers above Object Client. */ @Value -public class Range { +public class Range implements Comparable { @Getter long start; @Getter long end; @@ -85,4 +85,15 @@ public String toString() { public String toHttpString() { return String.format(TO_HTTP_STRING_FORMAT, start, end); } + + /** + * Allows sorting ranges based on their start position. + * + * @param other + * @return + */ + @Override + public int compareTo(Range other) { + return Long.compare(this.start, other.start); + } } diff --git a/common/src/main/java/software/amazon/s3/analyticsaccelerator/request/ReadMode.java b/common/src/main/java/software/amazon/s3/analyticsaccelerator/request/ReadMode.java index 25b512589..747198934 100644 --- a/common/src/main/java/software/amazon/s3/analyticsaccelerator/request/ReadMode.java +++ b/common/src/main/java/software/amazon/s3/analyticsaccelerator/request/ReadMode.java @@ -23,17 +23,18 @@ */ @AllArgsConstructor public enum ReadMode { - SYNC(true), - ASYNC(true), - SMALL_OBJECT_PREFETCH(true), - SEQUENTIAL_FILE_PREFETCH(true), - DICTIONARY_PREFETCH(false), - COLUMN_PREFETCH(false), - REMAINING_COLUMN_PREFETCH(false), - PREFETCH_TAIL(false), - READ_VECTORED(false); + SYNC(true, false), + ASYNC(true, false), + SMALL_OBJECT_PREFETCH(true, false), + SEQUENTIAL_FILE_PREFETCH(true, false), + DICTIONARY_PREFETCH(false, false), + COLUMN_PREFETCH(false, true), + REMAINING_COLUMN_PREFETCH(false, false), + PREFETCH_TAIL(false, false), + READ_VECTORED(false, true); private final boolean allowRequestExtension; + private final boolean coalesceRequests; /** * Should requests be extended for this read mode? @@ -46,4 +47,17 @@ public enum ReadMode { public boolean allowRequestExtension() { return allowRequestExtension; } + + /** + * Should requests be coalesced for this read mode? + * + *

When the read is from the parquet prefetcher or readVectored(), we know the exact ranges we + * want to read, so in this case don't extend the ranges. Yet if we are reading consecutive + * columns we want to merge requests into one. + * + * @return true if requests should be coalesced + */ + public boolean coalesceRequests() { + return coalesceRequests; + } } diff --git a/common/src/test/java/software/amazon/s3/analyticsaccelerator/common/ConnectorConfigurationTest.java b/common/src/test/java/software/amazon/s3/analyticsaccelerator/common/ConnectorConfigurationTest.java index 10cb9f9f0..c87266bb2 100644 --- a/common/src/test/java/software/amazon/s3/analyticsaccelerator/common/ConnectorConfigurationTest.java +++ b/common/src/test/java/software/amazon/s3/analyticsaccelerator/common/ConnectorConfigurationTest.java @@ -264,6 +264,45 @@ void testGetRequiredStringThrowsIfNotSet() { IllegalArgumentException.class, () -> configuration.getRequiredString("stringConfig1")); } + @Test + void testGetPositiveLong() { + ConnectorConfiguration configuration = getDefaultConfiguration(TEST_PREFIX); + long positiveLong = configuration.getPositiveLong("longConfig", 5); + assertEquals(1, positiveLong); + } + + @Test + void testGetPositiveLongWithDefault() { + ConnectorConfiguration configuration = getDefaultConfiguration(TEST_PREFIX); + long positiveLong = configuration.getPositiveLong("nonExistentKey", 10); + assertEquals(10, positiveLong); + } + + @Test + void testGetPositiveLongThrowsOnZero() { + Map configMap = new HashMap<>(); + configMap.put(TEST_PREFIX + ".zeroConfig", "0"); + ConnectorConfiguration configuration = new ConnectorConfiguration(configMap, TEST_PREFIX); + assertThrows( + IllegalArgumentException.class, () -> configuration.getPositiveLong("zeroConfig", 5)); + } + + @Test + void testGetPositiveLongThrowsOnNegative() { + Map configMap = new HashMap<>(); + configMap.put(TEST_PREFIX + ".negativeConfig", "-5"); + ConnectorConfiguration configuration = new ConnectorConfiguration(configMap, TEST_PREFIX); + assertThrows( + IllegalArgumentException.class, () -> configuration.getPositiveLong("negativeConfig", 5)); + } + + @Test + void testGetPositiveLongThrowsOnNegativeDefault() { + ConnectorConfiguration configuration = getDefaultConfiguration(TEST_PREFIX); + assertThrows( + IllegalArgumentException.class, () -> configuration.getPositiveLong("nonExistentKey", -1)); + } + private static ConnectorConfiguration getDefaultConfiguration(String prefix) { return new ConnectorConfiguration(getDefaultConfigurationMap(prefix), prefix); diff --git a/input-stream/src/integrationTest/java/software/amazon/s3/analyticsaccelerator/access/ReadVectoredTest.java b/input-stream/src/integrationTest/java/software/amazon/s3/analyticsaccelerator/access/ReadVectoredTest.java index cc6d5b645..c35aa1bf4 100644 --- a/input-stream/src/integrationTest/java/software/amazon/s3/analyticsaccelerator/access/ReadVectoredTest.java +++ b/input-stream/src/integrationTest/java/software/amazon/s3/analyticsaccelerator/access/ReadVectoredTest.java @@ -48,26 +48,33 @@ public class ReadVectoredTest extends IntegrationTestBase { @ParameterizedTest @MethodSource("vectoredReads") - void testVectoredReads(S3ClientKind s3ClientKind, IntFunction allocate) + void testVectoredReads( + S3ClientKind s3ClientKind, + IntFunction allocate, + AALInputStreamConfigurationKind configurationKind) throws IOException { - testReadVectored( - s3ClientKind, S3Object.RANDOM_1GB, AALInputStreamConfigurationKind.DEFAULT, allocate); + testReadVectored(s3ClientKind, S3Object.RANDOM_1GB, configurationKind, allocate); } @ParameterizedTest @MethodSource("vectoredReads") - void testVectoredReadsInSingleBlock(S3ClientKind s3ClientKind, IntFunction allocate) + void testVectoredReadsInSingleBlock( + S3ClientKind s3ClientKind, + IntFunction allocate, + AALInputStreamConfigurationKind configurationKind) throws IOException { - testReadVectoredInSingleBlock( - s3ClientKind, S3Object.RANDOM_1GB, AALInputStreamConfigurationKind.DEFAULT, allocate); + testReadVectoredInSingleBlock(s3ClientKind, S3Object.RANDOM_1GB, configurationKind, allocate); } @ParameterizedTest @MethodSource("vectoredReads") void testVectoredReadsForSequentialRanges( - S3ClientKind s3ClientKind, IntFunction allocate) throws IOException { + S3ClientKind s3ClientKind, + IntFunction allocate, + AALInputStreamConfigurationKind configurationKind) + throws IOException { testReadVectoredForSequentialRanges( - s3ClientKind, S3Object.RANDOM_1GB, AALInputStreamConfigurationKind.DEFAULT, allocate); + s3ClientKind, S3Object.RANDOM_1GB, configurationKind, allocate); } @Test @@ -281,13 +288,18 @@ static Stream vectoredReads() { s3ClientKinds.add(S3ClientKind.SDK_V2_JAVA_ASYNC); s3ClientKinds.add(S3ClientKind.SDK_V2_JAVA_SYNC); + List configurationKinds = new ArrayList<>(); + configurationKinds.add(AALInputStreamConfigurationKind.DEFAULT); + configurationKinds.add(AALInputStreamConfigurationKind.NO_REQUEST_COALESCING); + List> allocate = new ArrayList<>(); allocate.add(ByteBuffer::allocate); allocate.add(ByteBuffer::allocateDirect); for (S3ClientKind s3ClientKind : s3ClientKinds) { for (IntFunction allocator : allocate) { - testCases.add(Arguments.of(s3ClientKind, allocator)); + for (AALInputStreamConfigurationKind configurationKind : configurationKinds) + testCases.add(Arguments.of(s3ClientKind, allocator, configurationKind)); } } @@ -300,26 +312,26 @@ static Stream vectoredReads() { * * @param s3ClientKind S3 client kind to use * @param s3Object S3 object to read - * @param AALInputStreamConfigurationKind configuration kind + * @param configurationKind configuration kind * @param allocate method to allocate the buffer, can be direct or non-direct * @throws IOException on any IOException */ protected void testReadVectored( @NonNull S3ClientKind s3ClientKind, @NonNull S3Object s3Object, - @NonNull AALInputStreamConfigurationKind AALInputStreamConfigurationKind, + @NonNull AALInputStreamConfigurationKind configurationKind, @NonNull IntFunction allocate) throws IOException { try (S3AALClientStreamReader s3AALClientStreamReader = - getStreamReader(s3ClientKind, AALInputStreamConfigurationKind)) { + getStreamReader(s3ClientKind, configurationKind)) { S3SeekableInputStream s3SeekableInputStream = s3AALClientStreamReader.createReadStream(s3Object, OpenStreamInformation.ofDefaults()); List objectRanges = new ArrayList<>(); objectRanges.add(new ObjectRange(new CompletableFuture<>(), 50, ONE_MB)); - objectRanges.add(new ObjectRange(new CompletableFuture<>(), 2 * ONE_MB, 800)); + objectRanges.add(new ObjectRange(new CompletableFuture<>(), 3 * ONE_MB, 800)); // a range that should be within a single block objectRanges.add(new ObjectRange(new CompletableFuture<>(), 200 * ONE_MB, 8 * ONE_MB)); @@ -340,7 +352,8 @@ protected void testReadVectored( objectRange.getByteBuffer().join(); } - // Range [50MB - 51MB, 2MB - 2.8MB] will make 2 GET requests + // Range [50 - 1MB, 3MB - 3.8MB] will make 2 GET requests -- will not be coalesced with 1MB + // tolerance // Range [200MB - 208MB] will make a single GET as it is an 8MB block. // Range [260MB - 284MB] will make 3 GET requests assertEquals( @@ -357,12 +370,12 @@ protected void testReadVectored( protected void testReadVectoredInSingleBlock( @NonNull S3ClientKind s3ClientKind, @NonNull S3Object s3Object, - @NonNull AALInputStreamConfigurationKind AALInputStreamConfigurationKind, + @NonNull AALInputStreamConfigurationKind configurationKind, @NonNull IntFunction allocate) throws IOException { try (S3AALClientStreamReader s3AALClientStreamReader = - this.getStreamReader(s3ClientKind, AALInputStreamConfigurationKind)) { + this.getStreamReader(s3ClientKind, configurationKind)) { S3SeekableInputStream s3SeekableInputStream = s3AALClientStreamReader.createReadStream(s3Object, OpenStreamInformation.ofDefaults()); @@ -391,12 +404,12 @@ protected void testReadVectoredInSingleBlock( protected void testReadVectoredForSequentialRanges( @NonNull S3ClientKind s3ClientKind, @NonNull S3Object s3Object, - @NonNull AALInputStreamConfigurationKind AALInputStreamConfigurationKind, + @NonNull AALInputStreamConfigurationKind configurationKind, @NonNull IntFunction allocate) throws IOException { try (S3AALClientStreamReader s3AALClientStreamReader = - this.getStreamReader(s3ClientKind, AALInputStreamConfigurationKind)) { + this.getStreamReader(s3ClientKind, configurationKind)) { S3SeekableInputStream s3SeekableInputStream = s3AALClientStreamReader.createReadStream(s3Object, OpenStreamInformation.ofDefaults()); @@ -416,12 +429,18 @@ protected void testReadVectoredForSequentialRanges( objectRange.getByteBuffer().join(); } + int expectedRequests = + (configurationKind == AALInputStreamConfigurationKind.NO_REQUEST_COALESCING) + ? objectRanges.size() + : 3; + assertEquals( - 5, + expectedRequests, s3AALClientStreamReader .getS3SeekableInputStreamFactory() .getMetrics() .get(MetricKey.GET_REQUEST_COUNT)); + verifyStreamContents(objectRanges, s3AALClientStreamReader, s3Object); } } diff --git a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamFactory.java b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamFactory.java index ceedc0645..ceb36f349 100644 --- a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamFactory.java +++ b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamFactory.java @@ -171,7 +171,13 @@ LogicalIO createLogicalIO(S3URI s3URI, OpenStreamInformation openStreamInformati PhysicalIO createPhysicalIO(S3URI s3URI, OpenStreamInformation openStreamInformation) throws IOException { return new PhysicalIOImpl( - s3URI, objectMetadataStore, objectBlobStore, telemetry, openStreamInformation, threadPool); + s3URI, + objectMetadataStore, + objectBlobStore, + telemetry, + openStreamInformation, + threadPool, + configuration.getPhysicalIOConfiguration()); } void storeObjectMetadata(S3URI s3URI, ObjectMetadata metadata) { diff --git a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/PhysicalIOConfiguration.java b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/PhysicalIOConfiguration.java index a981074fb..2a970d9e8 100644 --- a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/PhysicalIOConfiguration.java +++ b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/PhysicalIOConfiguration.java @@ -50,6 +50,8 @@ public class PhysicalIOConfiguration { private static final long DEFAULT_READ_BUFFER_SIZE = 128 * ONE_KB; private static final long DEFAULT_TARGET_REQUEST_SIZE = 8 * ONE_MB; private static final double DEFAULT_REQUEST_TOLERANCE_RATIO = 1.4; + private static final boolean DEFAULT_COALESCE_REQUEST = true; + private static final long DEFAULT_COALESCE_REQUEST_TOLERANCE = ONE_MB; /** * Capacity, in blobs. {@link PhysicalIOConfiguration#DEFAULT_MEMORY_CAPACITY_BYTES} by default. @@ -175,6 +177,16 @@ public class PhysicalIOConfiguration { private static final String REQUEST_TOLERANCE_RATIO_KEY = "request.tolerance.ratio"; + /** Flag to enable request Coalescing */ + @Builder.Default private boolean requestCoalesce = DEFAULT_COALESCE_REQUEST; + + private static final String REQUEST_COALESCE_KEY = "request.coalesce"; + + /** Number of bytes to read and swallow when merging two requests */ + @Builder.Default private long requestCoalesceTolerance = DEFAULT_COALESCE_REQUEST_TOLERANCE; + + private static final String REQUEST_COALESCE_TOLERANCE_KEY = "request.coalesce.tolerance"; + /** Default set of settings for {@link PhysicalIO} */ public static final PhysicalIOConfiguration DEFAULT = PhysicalIOConfiguration.builder().build(); @@ -225,6 +237,10 @@ public static PhysicalIOConfiguration fromConfiguration(ConnectorConfiguration c configuration.getLong(TARGET_REQUEST_SIZE_KEY, DEFAULT_TARGET_REQUEST_SIZE)) .requestToleranceRatio( configuration.getDouble(REQUEST_TOLERANCE_RATIO_KEY, DEFAULT_REQUEST_TOLERANCE_RATIO)) + .requestCoalesce(configuration.getBoolean(REQUEST_COALESCE_KEY, DEFAULT_COALESCE_REQUEST)) + .requestCoalesceTolerance( + configuration.getLong( + REQUEST_COALESCE_TOLERANCE_KEY, DEFAULT_COALESCE_REQUEST_TOLERANCE)) .build(); } @@ -251,6 +267,9 @@ public static PhysicalIOConfiguration fromConfiguration(ConnectorConfiguration c * @param readBufferSize Size of the maximum buffer for read operations * @param targetRequestSize Target S3 request size, in bytes * @param requestToleranceRatio Request tolerance ratio + * @param requestCoalesce Flag to enable request Coalescing + * @param requestCoalesceTolerance Number of bytes to read and swallow when merging two requests + * @throws IllegalArgumentException if any of the parameters are invalid */ @Builder private PhysicalIOConfiguration( @@ -271,7 +290,9 @@ private PhysicalIOConfiguration( int threadPoolSize, long readBufferSize, long targetRequestSize, - double requestToleranceRatio) { + double requestToleranceRatio, + boolean requestCoalesce, + long requestCoalesceTolerance) { Preconditions.checkArgument(memoryCapacityBytes > 0, "`memoryCapacityBytes` must be positive"); Preconditions.checkArgument( memoryCleanupFrequencyMilliseconds > 0, @@ -319,6 +340,8 @@ private PhysicalIOConfiguration( this.readBufferSize = readBufferSize; this.targetRequestSize = targetRequestSize; this.requestToleranceRatio = requestToleranceRatio; + this.requestCoalesce = requestCoalesce; + this.requestCoalesceTolerance = requestCoalesceTolerance; } @Override @@ -345,7 +368,8 @@ public String toString() { builder.append("\treadBufferSize: " + readBufferSize + "\n"); builder.append("\ttargetRequestSize: " + targetRequestSize + "\n"); builder.append("\trequestToleranceRatio: " + requestToleranceRatio + "\n"); - + builder.append("\trequestCoalesce: " + requestCoalesce + "\n"); + builder.append("\trequestCoalesceTolerance: " + requestCoalesceTolerance + "\n"); return builder.toString(); } } diff --git a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/impl/PhysicalIOImpl.java b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/impl/PhysicalIOImpl.java index bb332184a..146c2162a 100644 --- a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/impl/PhysicalIOImpl.java +++ b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/impl/PhysicalIOImpl.java @@ -31,6 +31,7 @@ import software.amazon.s3.analyticsaccelerator.common.telemetry.Operation; import software.amazon.s3.analyticsaccelerator.common.telemetry.Telemetry; import software.amazon.s3.analyticsaccelerator.io.physical.PhysicalIO; +import software.amazon.s3.analyticsaccelerator.io.physical.PhysicalIOConfiguration; import software.amazon.s3.analyticsaccelerator.io.physical.data.Blob; import software.amazon.s3.analyticsaccelerator.io.physical.data.BlobStore; import software.amazon.s3.analyticsaccelerator.io.physical.data.MetadataStore; @@ -53,6 +54,7 @@ public class PhysicalIOImpl implements PhysicalIO { private ObjectKey objectKey; private final ObjectMetadata metadata; private final ExecutorService threadPool; + private final PhysicalIOConfiguration configuration; private final long physicalIOBirth = System.nanoTime(); @@ -73,6 +75,7 @@ public class PhysicalIOImpl implements PhysicalIO { * @param telemetry The {@link Telemetry} to use to report measurements. * @param openStreamInformation contains stream information * @param threadPool Thread pool for async operations + * @param configuration PhysicalIO Configuration */ public PhysicalIOImpl( @NonNull S3URI s3URI, @@ -80,7 +83,8 @@ public PhysicalIOImpl( @NonNull BlobStore blobStore, @NonNull Telemetry telemetry, @NonNull OpenStreamInformation openStreamInformation, - @NonNull ExecutorService threadPool) + @NonNull ExecutorService threadPool, + @NonNull PhysicalIOConfiguration configuration) throws IOException { this.metadataStore = metadataStore; this.blobStore = blobStore; @@ -89,6 +93,7 @@ public PhysicalIOImpl( this.metadata = this.metadataStore.get(s3URI, openStreamInformation); this.objectKey = ObjectKey.builder().s3URI(s3URI).etag(metadata.getEtag()).build(); this.threadPool = threadPool; + this.configuration = configuration; } /** @@ -213,10 +218,14 @@ public int readTail(byte[] buf, int off, int len) throws IOException { * Async method capable of executing a logical IO plan. * * @param ioPlan the plan to execute asynchronously + * @param readMode the read mode for which this IoPlan is being executed * @return an IOPlanExecution object tracking the execution of the submitted plan */ @Override public IOPlanExecution execute(IOPlan ioPlan, ReadMode readMode) { + if (readMode.coalesceRequests() && configuration.isRequestCoalesce()) { + ioPlan.coalesce(configuration.getRequestCoalesceTolerance()); + } return telemetry.measureVerbose( () -> Operation.builder() diff --git a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/plan/IOPlan.java b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/plan/IOPlan.java index 61324bc6d..df3545016 100644 --- a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/plan/IOPlan.java +++ b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/plan/IOPlan.java @@ -58,4 +58,36 @@ public String toString() { + this.prefetchRanges.stream().map(Range::toString).collect(Collectors.joining(",")) + "]"; } + + /** + * Coalesces the prefetch ranges in this plan. The coalescing is done in-place. + * + * @param tolerance the tolerance for coalescing + */ + public void coalesce(long tolerance) { + if (this.prefetchRanges.size() < 2) { + return; + } + // Ensure ranges are ordered by their start position. + Collections.sort(this.prefetchRanges); + int writeIndex = 0; + Range currentRange = this.prefetchRanges.get(0); + for (int i = 1; i < this.prefetchRanges.size(); i++) { + Range nextRange = this.prefetchRanges.get(i); + // Always compare with the last checked interval to avoid n^2 comparisons + if (currentRange.getEnd() + tolerance >= nextRange.getStart()) { + currentRange = + new Range(currentRange.getStart(), Math.max(currentRange.getEnd(), nextRange.getEnd())); + } else { + this.prefetchRanges.set(writeIndex++, currentRange); + currentRange = nextRange; + } + } + this.prefetchRanges.set(writeIndex++, currentRange); + // writeIndex now shows how many coalesced ranges added to the list. + // We should remove residue ones from the original list. + while (this.prefetchRanges.size() > writeIndex) { + this.prefetchRanges.remove(this.prefetchRanges.size() - 1); + } + } } diff --git a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamTest.java b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamTest.java index 56de39b64..23a296e49 100644 --- a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamTest.java +++ b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamTest.java @@ -393,7 +393,8 @@ void testMultiThreadUsage() throws IOException, InterruptedException { blobStore, TestTelemetry.DEFAULT, OpenStreamInformation.DEFAULT, - executorService); + executorService, + PhysicalIOConfiguration.DEFAULT); LogicalIO logicalIO = new ParquetLogicalIOImpl( TEST_OBJECT, @@ -592,7 +593,8 @@ private S3SeekableInputStream getTestStreamWithContent(String content, S3URI s3U blobStore, TestTelemetry.DEFAULT, OpenStreamInformation.DEFAULT, - executorService), + executorService, + PhysicalIOConfiguration.DEFAULT), TestTelemetry.DEFAULT, LogicalIOConfiguration.DEFAULT, new ParquetColumnPrefetchStore(LogicalIOConfiguration.DEFAULT)), @@ -622,7 +624,8 @@ public void testStreamMetadataConsistencyAfterTtlExpiry() blobStore, TestTelemetry.DEFAULT, OpenStreamInformation.DEFAULT, - executorService), + executorService, + PhysicalIOConfiguration.DEFAULT), TestTelemetry.DEFAULT, LogicalIOConfiguration.DEFAULT, new ParquetColumnPrefetchStore(LogicalIOConfiguration.DEFAULT)); diff --git a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamTestBase.java b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamTestBase.java index 3ecba6a84..e17d20f90 100644 --- a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamTestBase.java +++ b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamTestBase.java @@ -72,7 +72,8 @@ public class S3SeekableInputStreamTestBase { blobStore, TestTelemetry.DEFAULT, OpenStreamInformation.DEFAULT, - executorService), + executorService, + PhysicalIOConfiguration.DEFAULT), TestTelemetry.DEFAULT, logicalIOConfiguration, new ParquetColumnPrefetchStore(logicalIOConfiguration)); diff --git a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/logical/impl/ParquetLogicalIOImplTest.java b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/logical/impl/ParquetLogicalIOImplTest.java index 959b3576e..d51450e3c 100644 --- a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/logical/impl/ParquetLogicalIOImplTest.java +++ b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/logical/impl/ParquetLogicalIOImplTest.java @@ -158,7 +158,8 @@ void testMetadaWithZeroContentLength() throws IOException { blobStore, TestTelemetry.DEFAULT, OpenStreamInformation.DEFAULT, - mock(ExecutorService.class)); + mock(ExecutorService.class), + PhysicalIOConfiguration.DEFAULT); assertDoesNotThrow( () -> new ParquetLogicalIOImpl( diff --git a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/PhysicalIOConfigurationTest.java b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/PhysicalIOConfigurationTest.java index d8ea7e7d5..93d14fc8d 100644 --- a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/PhysicalIOConfigurationTest.java +++ b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/PhysicalIOConfigurationTest.java @@ -80,6 +80,8 @@ void testToString() { + "\tthreadPoolSize: 96\n" + "\treadBufferSize: 131072\n" + "\ttargetRequestSize: 20\n" - + "\trequestToleranceRatio: 1.4\n"); + + "\trequestToleranceRatio: 1.4\n" + + "\trequestCoalesce: true\n" + + "\trequestCoalesceTolerance: 1048576\n"); } } diff --git a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/impl/PhysicalIOImplTest.java b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/impl/PhysicalIOImplTest.java index 02f93b989..d740a1052 100644 --- a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/impl/PhysicalIOImplTest.java +++ b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/impl/PhysicalIOImplTest.java @@ -72,7 +72,8 @@ void testConstructorThrowsOnNullArgument() { mock(BlobStore.class), TestTelemetry.DEFAULT, OpenStreamInformation.DEFAULT, - executorService); + executorService, + PhysicalIOConfiguration.DEFAULT); }); assertThrows( @@ -84,7 +85,8 @@ void testConstructorThrowsOnNullArgument() { mock(BlobStore.class), TestTelemetry.DEFAULT, mock(OpenStreamInformation.class), - executorService); + executorService, + PhysicalIOConfiguration.DEFAULT); }); assertThrows( @@ -96,7 +98,8 @@ void testConstructorThrowsOnNullArgument() { null, TestTelemetry.DEFAULT, mock(OpenStreamInformation.class), - executorService); + executorService, + PhysicalIOConfiguration.DEFAULT); }); assertThrows( @@ -108,7 +111,8 @@ void testConstructorThrowsOnNullArgument() { mock(BlobStore.class), null, mock(OpenStreamInformation.class), - executorService); + executorService, + PhysicalIOConfiguration.DEFAULT); }); assertThrows( @@ -120,7 +124,8 @@ void testConstructorThrowsOnNullArgument() { mock(BlobStore.class), TestTelemetry.DEFAULT, null, - null); + null, + PhysicalIOConfiguration.DEFAULT); }); assertThrows( @@ -132,7 +137,8 @@ void testConstructorThrowsOnNullArgument() { mock(BlobStore.class), TestTelemetry.DEFAULT, OpenStreamInformation.DEFAULT, - executorService); + executorService, + PhysicalIOConfiguration.DEFAULT); }); assertThrows( @@ -144,7 +150,8 @@ void testConstructorThrowsOnNullArgument() { null, TestTelemetry.DEFAULT, OpenStreamInformation.DEFAULT, - executorService); + executorService, + PhysicalIOConfiguration.DEFAULT); }); assertThrows( @@ -156,7 +163,8 @@ void testConstructorThrowsOnNullArgument() { mock(BlobStore.class), null, OpenStreamInformation.DEFAULT, - executorService); + executorService, + PhysicalIOConfiguration.DEFAULT); }); assertThrows( @@ -168,7 +176,8 @@ void testConstructorThrowsOnNullArgument() { mock(BlobStore.class), TestTelemetry.DEFAULT, OpenStreamInformation.DEFAULT, - null); + null, + PhysicalIOConfiguration.DEFAULT); }); } @@ -197,7 +206,8 @@ public void test__readSingleByte_isCorrect() throws IOException { blobStore, TestTelemetry.DEFAULT, OpenStreamInformation.DEFAULT, - executorService); + executorService, + PhysicalIOConfiguration.DEFAULT); // When: we read // Then: returned data is correct @@ -233,7 +243,8 @@ public void test__regression_singleByteStream() throws IOException { blobStore, TestTelemetry.DEFAULT, OpenStreamInformation.DEFAULT, - executorService); + executorService, + PhysicalIOConfiguration.DEFAULT); // When: we read // Then: returned data is correct @@ -264,7 +275,8 @@ void testReadWithBuffer() throws IOException { blobStore, TestTelemetry.DEFAULT, OpenStreamInformation.DEFAULT, - executorService); + executorService, + PhysicalIOConfiguration.DEFAULT); byte[] buffer = new byte[5]; assertEquals(5, physicalIOImplV2.read(buffer, 0, 5, 5)); @@ -294,7 +306,8 @@ void testReadTail() throws IOException { blobStore, TestTelemetry.DEFAULT, OpenStreamInformation.DEFAULT, - executorService); + executorService, + PhysicalIOConfiguration.DEFAULT); byte[] buffer = new byte[5]; assertEquals(5, physicalIOImplV2.readTail(buffer, 0, 5)); assertEquals(1, blobStore.blobCount()); @@ -341,7 +354,8 @@ public void test_FailureEvictsObjectsAsExpected() throws IOException, Interrupte blobStore, TestTelemetry.DEFAULT, OpenStreamInformation.DEFAULT, - executorService); + executorService, + PhysicalIOConfiguration.DEFAULT); assertThrows(IOException.class, () -> physicalIOImplV2.read(0)); assertEquals(0, blobStore.blobCount()); @@ -383,7 +397,8 @@ public void test_FailureEvictsObjectsAsExpected_WhenSDKClientGetsStuck() throws blobStore, TestTelemetry.DEFAULT, OpenStreamInformation.DEFAULT, - executorService); + executorService, + PhysicalIOConfiguration.DEFAULT); assertThrows(IOException.class, () -> physicalIOImplV2.read(0)); assertEquals(0, blobStore.blobCount()); @@ -416,7 +431,8 @@ void testClose_WithoutEviction() throws IOException { blobStore, TestTelemetry.DEFAULT, OpenStreamInformation.DEFAULT, - executorService); + executorService, + PhysicalIOConfiguration.DEFAULT); // When: Read data to ensure blob is created byte[] buffer = new byte[4]; @@ -459,7 +475,8 @@ void testCloseWithEviction() throws IOException { mockBlobStore, TestTelemetry.DEFAULT, OpenStreamInformation.DEFAULT, - executorService); + executorService, + PhysicalIOConfiguration.DEFAULT); ObjectKey objectKey = ObjectKey.builder().s3URI(s3URI).etag(fakeObjectClient.getEtag()).build(); // When physicalIO.close(true); @@ -493,7 +510,8 @@ void testPartialRead() throws IOException { blobStore, TestTelemetry.DEFAULT, OpenStreamInformation.DEFAULT, - executorService); + executorService, + PhysicalIOConfiguration.DEFAULT); // When: Read partial data byte[] buffer = new byte[4]; @@ -538,7 +556,8 @@ private void readVectored(IntFunction allocate) throws IOException { blobStore, TestTelemetry.DEFAULT, OpenStreamInformation.DEFAULT, - executorService); + executorService, + PhysicalIOConfiguration.DEFAULT); List objectRanges = new ArrayList<>(); objectRanges.add(new ObjectRange(new CompletableFuture<>(), 2, 3)); diff --git a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/plan/IOPlanTest.java b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/plan/IOPlanTest.java index ebf58bc0a..61f38b218 100644 --- a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/plan/IOPlanTest.java +++ b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/plan/IOPlanTest.java @@ -68,4 +68,89 @@ void testRangeToString() { void testEmptyPlanToString() { assertEquals("[]", IOPlan.EMPTY_PLAN.toString()); } + + @Test + void testCoalesceOverlappingRanges() { + ArrayList ranges = new ArrayList<>(); + ranges.add(new Range(1, 5)); + ranges.add(new Range(3, 8)); + IOPlan ioPlan = new IOPlan(ranges); + ioPlan.coalesce(0); + assertEquals(1, ioPlan.getPrefetchRanges().size()); + assertEquals(new Range(1, 8), ioPlan.getPrefetchRanges().get(0)); + } + + @Test + void testCoalesceAdjacentRanges() { + ArrayList ranges = new ArrayList<>(); + ranges.add(new Range(1, 5)); + ranges.add(new Range(5, 10)); + IOPlan ioPlan = new IOPlan(ranges); + ioPlan.coalesce(0); + assertEquals(1, ioPlan.getPrefetchRanges().size()); + assertEquals(new Range(1, 10), ioPlan.getPrefetchRanges().get(0)); + } + + @Test + void testCoalesceWithTolerance() { + ArrayList ranges = new ArrayList<>(); + ranges.add(new Range(1, 5)); + ranges.add(new Range(7, 12)); + IOPlan ioPlan = new IOPlan(ranges); + ioPlan.coalesce(2); + assertEquals(1, ioPlan.getPrefetchRanges().size()); + assertEquals(new Range(1, 12), ioPlan.getPrefetchRanges().get(0)); + } + + @Test + void testCoalesceNoMerge() { + ArrayList ranges = new ArrayList<>(); + ranges.add(new Range(1, 5)); + ranges.add(new Range(10, 15)); + IOPlan ioPlan = new IOPlan(ranges); + ioPlan.coalesce(0); + assertEquals(2, ioPlan.getPrefetchRanges().size()); + } + + @Test + void testCoalesceSingleRange() { + IOPlan ioPlan = new IOPlan(new Range(1, 5)); + ioPlan.coalesce(0); + assertEquals(1, ioPlan.getPrefetchRanges().size()); + assertEquals(new Range(1, 5), ioPlan.getPrefetchRanges().get(0)); + } + + @Test + void testCoalesceEmptyPlan() { + IOPlan ioPlan = new IOPlan(new ArrayList<>()); + ioPlan.coalesce(0); + assertEquals(0, ioPlan.getPrefetchRanges().size()); + } + + @Test + void testCoalesceMultipleRanges() { + ArrayList ranges = new ArrayList<>(); + ranges.add(new Range(1, 3)); + ranges.add(new Range(4, 7)); + ranges.add(new Range(9, 11)); + ranges.add(new Range(12, 15)); + IOPlan ioPlan = new IOPlan(ranges); + ioPlan.coalesce(1); + assertEquals(2, ioPlan.getPrefetchRanges().size()); + assertEquals(new Range(1, 7), ioPlan.getPrefetchRanges().get(0)); + assertEquals(new Range(9, 15), ioPlan.getPrefetchRanges().get(1)); + } + + @Test + void testCoalesceUnorderedRanges() { + ArrayList ranges = new ArrayList<>(); + ranges.add(new Range(10, 15)); + ranges.add(new Range(1, 5)); + ranges.add(new Range(6, 8)); + IOPlan ioPlan = new IOPlan(ranges); + ioPlan.coalesce(1); + assertEquals(2, ioPlan.getPrefetchRanges().size()); + assertEquals(new Range(1, 8), ioPlan.getPrefetchRanges().get(0)); + assertEquals(new Range(10, 15), ioPlan.getPrefetchRanges().get(1)); + } } diff --git a/input-stream/src/testFixtures/java/software/amazon/s3/analyticsaccelerator/access/AALInputStreamConfigurationKind.java b/input-stream/src/testFixtures/java/software/amazon/s3/analyticsaccelerator/access/AALInputStreamConfigurationKind.java index c71a8e1db..40924c020 100644 --- a/input-stream/src/testFixtures/java/software/amazon/s3/analyticsaccelerator/access/AALInputStreamConfigurationKind.java +++ b/input-stream/src/testFixtures/java/software/amazon/s3/analyticsaccelerator/access/AALInputStreamConfigurationKind.java @@ -30,7 +30,8 @@ public enum AALInputStreamConfigurationKind { GRAY_FAILURE("GRAY_FAILURE", grayFailureConfiguration()), READ_CORRECTNESS("READ_CORRECTNESS", readCorrectnessConfiguration()), CONCURRENCY_CORRECTNESS("CONCURRENCY_CORRECTNESS", concurrencyCorrectnessConfiguration()), - NO_RETRY("NO_RETRY", noRetryConfiguration()); + NO_RETRY("NO_RETRY", noRetryConfiguration()), + NO_REQUEST_COALESCING("NO_COALESCING", noRequestCoalescing()); private final String name; private final S3SeekableInputStreamConfiguration value; @@ -79,6 +80,15 @@ private static S3SeekableInputStreamConfiguration concurrencyCorrectnessConfigur return S3SeekableInputStreamConfiguration.fromConfiguration(config); } + private static S3SeekableInputStreamConfiguration noRequestCoalescing() { + String configurationPrefix = "noCoalescing"; + Map customConfiguration = new HashMap<>(); + customConfiguration.put(configurationPrefix + ".physicalio.request.coalesce", "false"); + ConnectorConfiguration config = + new ConnectorConfiguration(customConfiguration, configurationPrefix); + return S3SeekableInputStreamConfiguration.fromConfiguration(config); + } + private static String getMemoryCapacity() { long maxHeapBytes = Runtime.getRuntime().maxMemory(); double percentage = 0.0000001;