diff --git a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/Block.java b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/Block.java index e591c419..2f4d03e3 100644 --- a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/Block.java +++ b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/Block.java @@ -19,11 +19,15 @@ import java.io.IOException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import javax.annotation.Nullable; import lombok.Getter; import lombok.NonNull; import software.amazon.s3.analyticsaccelerator.common.Metrics; import software.amazon.s3.analyticsaccelerator.common.Preconditions; +import software.amazon.s3.analyticsaccelerator.retry.RetryPolicy; +import software.amazon.s3.analyticsaccelerator.retry.RetryStrategy; +import software.amazon.s3.analyticsaccelerator.retry.SeekableInputStreamRetryStrategy; import software.amazon.s3.analyticsaccelerator.util.BlockKey; import software.amazon.s3.analyticsaccelerator.util.MetricKey; @@ -49,6 +53,9 @@ public class Block implements Closeable { private final BlobStoreIndexCache indexCache; private final Metrics aggregatingMetrics; private final long readTimeout; + private final int retryCount; + private final RetryStrategy retryStrategy; + /** * A synchronization aid that allows threads to wait until the block's data is available. * @@ -75,13 +82,15 @@ public class Block implements Closeable { * @param indexCache blobstore index cache * @param aggregatingMetrics blobstore metrics * @param readTimeout read timeout in milliseconds + * @param retryCount number of retries */ public Block( @NonNull BlockKey blockKey, long generation, @NonNull BlobStoreIndexCache indexCache, @NonNull Metrics aggregatingMetrics, - long readTimeout) { + long readTimeout, + int retryCount) { Preconditions.checkArgument( 0 <= generation, "`generation` must be non-negative; was: %s", generation); @@ -90,6 +99,27 @@ public Block( this.indexCache = indexCache; this.aggregatingMetrics = aggregatingMetrics; this.readTimeout = readTimeout; + this.retryCount = retryCount; + this.retryStrategy = createRetryStrategy(); + } + + /** + * Helper to construct retryStrategy + * + * @return a {@link RetryStrategy} to retry when timeouts are set + * @throws RuntimeException if all retries fails and an error occurs + */ + @SuppressWarnings("unchecked") + private RetryStrategy createRetryStrategy() { + if (this.readTimeout > 0) { + RetryPolicy timeoutRetries = + RetryPolicy.builder() + .handle(InterruptedException.class, TimeoutException.class, IOException.class) + .withMaxRetries(this.retryCount) + .build(); + return new SeekableInputStreamRetryStrategy<>(timeoutRetries); + } + return new SeekableInputStreamRetryStrategy<>(); } /** @@ -101,7 +131,7 @@ public Block( */ public int read(long pos) throws IOException { Preconditions.checkArgument(0 <= pos, "`pos` must not be negative"); - awaitData(); + awaitDataWithRetry(); indexCache.recordAccess(this.blockKey); int contentOffset = posToOffset(pos); return Byte.toUnsignedInt(this.data[contentOffset]); @@ -124,7 +154,7 @@ public int read(byte @NonNull [] buf, int off, int len, long pos) throws IOExcep Preconditions.checkArgument(0 <= len, "`len` must not be negative"); Preconditions.checkArgument(off < buf.length, "`off` must be less than size of buffer"); - awaitData(); + awaitDataWithRetry(); indexCache.recordAccess(this.blockKey); int contentOffset = posToOffset(pos); @@ -168,6 +198,14 @@ public void setData(final byte[] data) { dataReadyLatch.countDown(); } + private void awaitDataWithRetry() throws IOException { + this.retryStrategy.get( + () -> { + awaitData(); + return null; + }); + } + /** * Waits for the block's data to become available. This method blocks until {@link * #setData(byte[])} is called. @@ -178,13 +216,16 @@ private void awaitData() throws IOException { try { if (!dataReadyLatch.await(readTimeout, TimeUnit.MILLISECONDS)) { throw new IOException( - "Failed to read data", new IOException("Request timed out to fill the block")); + "Error while reading data. Request timed out after " + + readTimeout + + "ms while waiting for block data"); } } catch (InterruptedException e) { - throw new IOException("Failed to read data", e); + throw new IOException("Error while reading data. Read interrupted while waiting for data", e); } - if (data == null) throw new IOException("Failed to read data"); + if (data == null) + throw new IOException("Error while reading data. Block data is null after successful await"); } /** diff --git a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlockManager.java b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlockManager.java index aca16b50..d923bf90 100644 --- a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlockManager.java +++ b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlockManager.java @@ -100,7 +100,8 @@ public BlockManager( this::removeBlocks, aggregatingMetrics, openStreamInformation, - telemetry); + telemetry, + configuration); this.sequentialReadProgression = new SequentialReadProgression(configuration); this.rangeOptimiser = new RangeOptimiser(configuration); @@ -213,7 +214,8 @@ public synchronized void makeRangeAvailable(long pos, long len, ReadMode readMod generation, this.indexCache, this.aggregatingMetrics, - this.configuration.getBlockReadTimeout()); + this.configuration.getBlockReadTimeout(), + this.configuration.getBlockReadRetryCount()); // Add block to the store for future reference blockStore.add(block); blocksToFill.add(block); diff --git a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/impl/PhysicalIOImpl.java b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/impl/PhysicalIOImpl.java index 2b1f01f2..e84f4acf 100644 --- a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/impl/PhysicalIOImpl.java +++ b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/impl/PhysicalIOImpl.java @@ -314,13 +314,30 @@ private void makeReadVectoredRangesAvailable(List objectRanges) { execute(new IOPlan(ranges), ReadMode.READ_VECTORED); } + /** + * Handles exceptions from operations by evicting keys from caches when appropriate. + * + * @param e The exception to handle + */ private void handleOperationExceptions(Exception e) { + boolean shouldEvict = false; + + // Check for IO errors while reading data + if (e instanceof IOException + && e.getMessage() != null + && e.getMessage().contains("Error while reading data.")) { + shouldEvict = true; + } + + // Check for precondition failed errors (412) if (e.getCause() != null && e.getCause().getMessage() != null - && (e.getCause().getMessage().contains("Status Code: 412") - || e.getCause().getMessage().contains("Error while getting block") - || e.getCause().getMessage().contains("Failed to read data") - || e.getCause().getMessage().contains("Request timed out to fill the block"))) { + && e.getCause().getMessage().contains("Status Code: 412")) { + shouldEvict = true; + } + + // Evict keys if needed + if (shouldEvict) { try { metadataStore.evictKey(this.objectKey.getS3URI()); } finally { diff --git a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/reader/StreamReader.java b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/reader/StreamReader.java index 77481451..51dbed2b 100644 --- a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/reader/StreamReader.java +++ b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/reader/StreamReader.java @@ -21,7 +21,9 @@ import java.io.IOException; import java.io.InputStream; import java.util.List; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeoutException; import java.util.function.Consumer; import java.util.stream.Collectors; import lombok.NonNull; @@ -31,6 +33,7 @@ import software.amazon.s3.analyticsaccelerator.common.Preconditions; import software.amazon.s3.analyticsaccelerator.common.telemetry.Operation; import software.amazon.s3.analyticsaccelerator.common.telemetry.Telemetry; +import software.amazon.s3.analyticsaccelerator.io.physical.PhysicalIOConfiguration; import software.amazon.s3.analyticsaccelerator.io.physical.data.Block; import software.amazon.s3.analyticsaccelerator.request.GetRequest; import software.amazon.s3.analyticsaccelerator.request.ObjectClient; @@ -38,6 +41,9 @@ import software.amazon.s3.analyticsaccelerator.request.Range; import software.amazon.s3.analyticsaccelerator.request.ReadMode; import software.amazon.s3.analyticsaccelerator.request.Referrer; +import software.amazon.s3.analyticsaccelerator.retry.RetryPolicy; +import software.amazon.s3.analyticsaccelerator.retry.RetryStrategy; +import software.amazon.s3.analyticsaccelerator.retry.SeekableInputStreamRetryStrategy; import software.amazon.s3.analyticsaccelerator.util.MetricKey; import software.amazon.s3.analyticsaccelerator.util.ObjectKey; import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation; @@ -59,7 +65,11 @@ public class StreamReader implements Closeable { private final Metrics aggregatingMetrics; private final OpenStreamInformation openStreamInformation; private final Telemetry telemetry; + private final PhysicalIOConfiguration physicalIOConfiguration; + private final RetryStrategy retryStrategy; + + private static final String OPERATION_GET_OBJECT = "s3.stream.get"; private static final String OPERATION_STREAM_READ = "s3.stream.read"; private static final Logger LOG = LoggerFactory.getLogger(StreamReader.class); @@ -74,6 +84,7 @@ public class StreamReader implements Closeable { * @param aggregatingMetrics the metrics aggregator for performance or usage monitoring * @param openStreamInformation contains stream information * @param telemetry an instance of {@link Telemetry} to use + * @param physicalIOConfiguration an instance of {@link PhysicalIOConfiguration} to use */ public StreamReader( @NonNull ObjectClient objectClient, @@ -82,7 +93,8 @@ public StreamReader( @NonNull Consumer> removeBlocksFunc, @NonNull Metrics aggregatingMetrics, @NonNull OpenStreamInformation openStreamInformation, - @NonNull Telemetry telemetry) { + @NonNull Telemetry telemetry, + @NonNull PhysicalIOConfiguration physicalIOConfiguration) { this.objectClient = objectClient; this.objectKey = objectKey; this.threadPool = threadPool; @@ -90,6 +102,27 @@ public StreamReader( this.aggregatingMetrics = aggregatingMetrics; this.openStreamInformation = openStreamInformation; this.telemetry = telemetry; + this.physicalIOConfiguration = physicalIOConfiguration; + this.retryStrategy = createRetryStrategy(); + } + + /** + * Helper to construct retryStrategy + * + * @return a {@link RetryStrategy} to retry when timeouts are set + * @throws RuntimeException if all retries fails and an error occurs + */ + @SuppressWarnings("unchecked") + private RetryStrategy createRetryStrategy() { + if (this.physicalIOConfiguration.getBlockReadTimeout() > 0) { + RetryPolicy timeoutRetries = + RetryPolicy.builder() + .handle(InterruptedException.class, TimeoutException.class, ExecutionException.class) + .withMaxRetries(this.physicalIOConfiguration.getBlockReadRetryCount()) + .build(); + return new SeekableInputStreamRetryStrategy<>(timeoutRetries); + } + return new SeekableInputStreamRetryStrategy<>(); } /** @@ -121,58 +154,69 @@ public void read(@NonNull final List blocks, ReadMode readMode) { * @return a Runnable that executes the read operation asynchronously */ private Runnable processReadTask(final List blocks, ReadMode readMode) { - return () -> { - this.telemetry.measureCritical( - () -> - Operation.builder() - .name(OPERATION_STREAM_READ) - .attribute(StreamAttributes.uri(this.objectKey.getS3URI())) - .attribute(StreamAttributes.etag(this.objectKey.getEtag())) - .attribute( - StreamAttributes.effectiveRange( - blocks.get(0).getBlockKey().getRange().getStart(), - blocks.get(blocks.size() - 1).getBlockKey().getRange().getEnd())) - .build(), - () -> { - // Calculate the byte range needed to cover all blocks - Range requestRange = computeRange(blocks); + return () -> + this.telemetry.measureCritical( + () -> + Operation.builder() + .name(OPERATION_STREAM_READ) + .attribute(StreamAttributes.uri(this.objectKey.getS3URI())) + .attribute(StreamAttributes.etag(this.objectKey.getEtag())) + .attribute( + StreamAttributes.effectiveRange( + blocks.get(0).getBlockKey().getRange().getStart(), + blocks.get(blocks.size() - 1).getBlockKey().getRange().getEnd())) + .build(), + () -> { + // Calculate the byte range needed to cover all blocks + Range requestRange = computeRange(blocks); - // Build S3 GET request with range, ETag validation, and referrer info - GetRequest getRequest = - GetRequest.builder() - .s3Uri(objectKey.getS3URI()) - .range(requestRange) - .etag(objectKey.getEtag()) - .referrer(new Referrer(requestRange.toHttpString(), readMode)) - .build(); + // Build S3 GET request with range, ETag validation, and referrer info + GetRequest getRequest = + GetRequest.builder() + .s3Uri(objectKey.getS3URI()) + .range(requestRange) + .etag(objectKey.getEtag()) + .referrer(new Referrer(requestRange.toHttpString(), readMode)) + .build(); - openStreamInformation.getRequestCallback().onGetRequest(); + openStreamInformation.getRequestCallback().onGetRequest(); - // Fetch the object content from S3 - ObjectContent objectContent = fetchObjectContent(getRequest); + // Fetch the object content from S3 + ObjectContent objectContent; + try { + this.aggregatingMetrics.add(MetricKey.GET_REQUEST_COUNT, 1); + objectContent = fetchObjectContent(getRequest); + } catch (IOException e) { + LOG.error("IOException while fetching object content", e); + removeNonFilledBlocksFromStore(blocks); + return; + } - if (objectContent == null) { - // Couldn't successfully get the response from S3. - // Remove blocks from store and complete async operation - removeNonFilledBlocksFromStore(blocks); - return; - } + if (objectContent == null) { + // Couldn't successfully get the response from S3. + // Remove blocks from store and complete async operation + removeNonFilledBlocksFromStore(blocks); + return; + } - // Process the input stream and populate data blocks - try (InputStream inputStream = objectContent.getStream()) { - boolean success = readBlocksFromStream(inputStream, blocks, requestRange.getStart()); - if (!success) { + // Process the input stream and populate data blocks + try (InputStream inputStream = objectContent.getStream()) { + boolean success = + readBlocksFromStream(inputStream, blocks, requestRange.getStart()); + if (!success) { + removeNonFilledBlocksFromStore(blocks); + } + } catch (EOFException e) { + LOG.error("EOFException while reading blocks", e); + removeNonFilledBlocksFromStore(blocks); + } catch (IOException e) { + LOG.error("IOException while reading blocks", e); + removeNonFilledBlocksFromStore(blocks); + } catch (Exception e) { + LOG.error("Unexpected exception while reading blocks", e); removeNonFilledBlocksFromStore(blocks); } - } catch (EOFException e) { - LOG.error("EOFException while reading blocks", e); - removeNonFilledBlocksFromStore(blocks); - } catch (IOException e) { - LOG.error("IOException while reading blocks", e); - removeNonFilledBlocksFromStore(blocks); - } - }); - }; + }); } /** @@ -220,16 +264,20 @@ private Range computeRange(List blocks) { * @param getRequest the S3 GET request containing object URI, range, and ETag * @return the ObjectContent containing the S3 object data stream, or null if request fails */ - private ObjectContent fetchObjectContent(GetRequest getRequest) { - try { - - this.aggregatingMetrics.add(MetricKey.GET_REQUEST_COUNT, 1); - // Block on the async S3 request and return the result - return this.objectClient.getObject(getRequest, this.openStreamInformation).join(); - } catch (Exception e) { - LOG.error("Error while fetching object content", e); - return null; - } + private ObjectContent fetchObjectContent(GetRequest getRequest) throws IOException { + return this.retryStrategy.get( + () -> + telemetry.measureJoinCritical( + () -> + Operation.builder() + .name(OPERATION_GET_OBJECT) + .attribute(StreamAttributes.uri(this.objectKey.getS3URI())) + .attribute(StreamAttributes.uri(getRequest.getS3Uri())) + .attribute(StreamAttributes.rangeLength(getRequest.getRange().getLength())) + .attribute(StreamAttributes.range(getRequest.getRange())) + .build(), + this.objectClient.getObject(getRequest, this.openStreamInformation), + this.physicalIOConfiguration.getBlockReadTimeout())); } /** diff --git a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlockStoreTest.java b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlockStoreTest.java index 3ebf6d7a..fc4bb89f 100644 --- a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlockStoreTest.java +++ b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlockStoreTest.java @@ -40,6 +40,7 @@ public class BlockStoreTest { private static final String ETAG = "RANDOM"; private static final ObjectKey objectKey = ObjectKey.builder().s3URI(TEST_URI).etag(ETAG).build(); private static final long DEFAULT_READ_TIMEOUT = 120_000; + private static final int DEFAULT_RETRY_COUNT = 20; private BlobStoreIndexCache mockIndexCache; private Metrics mockMetrics; @@ -91,7 +92,8 @@ public void test__blockStore__getBlockAfterAddBlock() { 0, mock(BlobStoreIndexCache.class), mock(Metrics.class), - DEFAULT_READ_TIMEOUT)); + DEFAULT_READ_TIMEOUT, + DEFAULT_RETRY_COUNT)); // Then: getBlock can retrieve the same block Optional b = blockStore.getBlock(4); @@ -145,7 +147,9 @@ public void test__blockStore__getBlockByIndex() { // Given: BlockStore with a block at a specific index BlockKey blockKey = new BlockKey(objectKey, new Range(8192, 16383)); // Assuming readBufferSize is 8KB - Block block = new Block(blockKey, 0, mockIndexCache, mockMetrics, DEFAULT_READ_TIMEOUT); + Block block = + new Block( + blockKey, 0, mockIndexCache, mockMetrics, DEFAULT_READ_TIMEOUT, DEFAULT_RETRY_COUNT); blockStore.add(block); // When: getBlockByIndex is called with the correct index @@ -181,8 +185,12 @@ public void test__blockStore__getBlock_negativePosition() { public void test__blockStore__add_duplicateBlock() { // Given: A block already in the store BlockKey blockKey = new BlockKey(objectKey, new Range(0, 8191)); - Block block1 = new Block(blockKey, 0, mockIndexCache, mockMetrics, DEFAULT_READ_TIMEOUT); - Block block2 = new Block(blockKey, 1, mockIndexCache, mockMetrics, DEFAULT_READ_TIMEOUT); + Block block1 = + new Block( + blockKey, 0, mockIndexCache, mockMetrics, DEFAULT_READ_TIMEOUT, DEFAULT_RETRY_COUNT); + Block block2 = + new Block( + blockKey, 1, mockIndexCache, mockMetrics, DEFAULT_READ_TIMEOUT, DEFAULT_RETRY_COUNT); // When: The first block is added blockStore.add(block1); @@ -203,7 +211,15 @@ public void test__blockStore__add_duplicateBlock() { public void test__blockStore__remove() throws IOException { // Given: A block in the store BlockKey blockKey = new BlockKey(objectKey, new Range(0, 4)); - Block block = spy(new Block(blockKey, 0, mockIndexCache, mockMetrics, DEFAULT_READ_TIMEOUT)); + Block block = + spy( + new Block( + blockKey, + 0, + mockIndexCache, + mockMetrics, + DEFAULT_READ_TIMEOUT, + DEFAULT_RETRY_COUNT)); block.setData(new byte[] {1, 2, 3, 4, 5}); when(block.isDataReady()).thenReturn(true); blockStore.add(block); @@ -227,7 +243,9 @@ public void test__blockStore__remove() throws IOException { public void test__blockStore__remove_nonExistentBlock() { // Given: A block not in the store BlockKey blockKey = new BlockKey(objectKey, new Range(0, 8191)); - Block block = new Block(blockKey, 0, mockIndexCache, mockMetrics, DEFAULT_READ_TIMEOUT); + Block block = + new Block( + blockKey, 0, mockIndexCache, mockMetrics, DEFAULT_READ_TIMEOUT, DEFAULT_RETRY_COUNT); // When: An attempt is made to remove the block blockStore.remove(block); @@ -240,7 +258,15 @@ public void test__blockStore__remove_nonExistentBlock() { public void test__blockStore__remove_dataNotReady() throws IOException { // Given: A block in the store with data not ready BlockKey blockKey = new BlockKey(objectKey, new Range(0, 4)); - Block block = spy(new Block(blockKey, 0, mockIndexCache, mockMetrics, DEFAULT_READ_TIMEOUT)); + Block block = + spy( + new Block( + blockKey, + 0, + mockIndexCache, + mockMetrics, + DEFAULT_READ_TIMEOUT, + DEFAULT_RETRY_COUNT)); when(block.isDataReady()).thenReturn(false); blockStore.add(block); @@ -264,8 +290,12 @@ public void test__blockStore__getMissingBlockIndexesInRange() { BlockKey blockKey1 = new BlockKey(objectKey, new Range(0, 8191)); // Index 0 BlockKey blockKey2 = new BlockKey(objectKey, new Range(16384, 24575)); // Index 2 - Block block1 = new Block(blockKey1, 0, mockIndexCache, mockMetrics, DEFAULT_READ_TIMEOUT); - Block block2 = new Block(blockKey2, 0, mockIndexCache, mockMetrics, DEFAULT_READ_TIMEOUT); + Block block1 = + new Block( + blockKey1, 0, mockIndexCache, mockMetrics, DEFAULT_READ_TIMEOUT, DEFAULT_RETRY_COUNT); + Block block2 = + new Block( + blockKey2, 0, mockIndexCache, mockMetrics, DEFAULT_READ_TIMEOUT, DEFAULT_RETRY_COUNT); blockStore.add(block1); blockStore.add(block2); @@ -323,7 +353,9 @@ public void test__blockStore__isEmpty() { // When: A block is added BlockKey blockKey = new BlockKey(objectKey, new Range(0, 8191)); - Block block = new Block(blockKey, 0, mockIndexCache, mockMetrics, DEFAULT_READ_TIMEOUT); + Block block = + new Block( + blockKey, 0, mockIndexCache, mockMetrics, DEFAULT_READ_TIMEOUT, DEFAULT_RETRY_COUNT); blockStore.add(block); // Then: isEmpty returns false @@ -349,7 +381,13 @@ public void test__blockStore__concurrentAddRemove() throws InterruptedException BlockKey blockKey = new BlockKey(objectKey, new Range(index * 8192L, (index + 1) * 8192L - 1)); Block block = - new Block(blockKey, index, mockIndexCache, mockMetrics, DEFAULT_READ_TIMEOUT); + new Block( + blockKey, + index, + mockIndexCache, + mockMetrics, + DEFAULT_READ_TIMEOUT, + DEFAULT_RETRY_COUNT); blockStore.add(block); blockStore.remove(block); latch.countDown(); @@ -366,7 +404,9 @@ public void test__blockStore__concurrentAddRemove() throws InterruptedException @Test public void test__blockStore__getBlock_atRangeBoundaries() { BlockKey blockKey = new BlockKey(objectKey, new Range(0, 8191)); - Block block = new Block(blockKey, 0, mockIndexCache, mockMetrics, DEFAULT_READ_TIMEOUT); + Block block = + new Block( + blockKey, 0, mockIndexCache, mockMetrics, DEFAULT_READ_TIMEOUT, DEFAULT_RETRY_COUNT); blockStore.add(block); // At start of range @@ -396,7 +436,9 @@ public void test__blockStore__remove_nullBlock() { @Test public void test__blockStore__getBlock_positionOutsideAnyBlock() { BlockKey blockKey = new BlockKey(objectKey, new Range(0, 8191)); - Block block = new Block(blockKey, 0, mockIndexCache, mockMetrics, DEFAULT_READ_TIMEOUT); + Block block = + new Block( + blockKey, 0, mockIndexCache, mockMetrics, DEFAULT_READ_TIMEOUT, DEFAULT_RETRY_COUNT); blockStore.add(block); Optional outsideBlock = blockStore.getBlock(100_000); diff --git a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlockTest.java b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlockTest.java index c42ce2c5..ae8681bb 100644 --- a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlockTest.java +++ b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlockTest.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import software.amazon.s3.analyticsaccelerator.common.Metrics; @@ -39,6 +40,7 @@ public class BlockTest { private static final String TEST_DATA = "test-data"; private static final byte[] TEST_DATA_BYTES = TEST_DATA.getBytes(StandardCharsets.UTF_8); private static final long READ_TIMEOUT = 5_000; + private static final int RETRY_COUNT = 2; private ObjectKey objectKey; private BlockKey blockKey; @@ -55,7 +57,7 @@ void setUp() { @Test void testConstructorWithValidParameters() { - Block block = new Block(blockKey, 0, mockIndexCache, mockMetrics, READ_TIMEOUT); + Block block = new Block(blockKey, 0, mockIndexCache, mockMetrics, READ_TIMEOUT, RETRY_COUNT); assertNotNull(block); assertEquals(blockKey, block.getBlockKey()); @@ -67,27 +69,28 @@ void testConstructorWithValidParameters() { void testConstructorWithNullBlockKey() { assertThrows( NullPointerException.class, - () -> new Block(null, 0, mockIndexCache, mockMetrics, READ_TIMEOUT)); + () -> new Block(null, 0, mockIndexCache, mockMetrics, READ_TIMEOUT, RETRY_COUNT)); } @Test void testConstructorWithNullIndexCache() { assertThrows( - NullPointerException.class, () -> new Block(blockKey, 0, null, mockMetrics, READ_TIMEOUT)); + NullPointerException.class, + () -> new Block(blockKey, 0, null, mockMetrics, READ_TIMEOUT, RETRY_COUNT)); } @Test void testConstructorWithNullMetrics() { assertThrows( NullPointerException.class, - () -> new Block(blockKey, 0, mockIndexCache, null, READ_TIMEOUT)); + () -> new Block(blockKey, 0, mockIndexCache, null, READ_TIMEOUT, RETRY_COUNT)); } @Test void testConstructorWithNegativeGeneration() { assertThrows( IllegalArgumentException.class, - () -> new Block(blockKey, -1, mockIndexCache, mockMetrics, READ_TIMEOUT)); + () -> new Block(blockKey, -1, mockIndexCache, mockMetrics, READ_TIMEOUT, RETRY_COUNT)); } @Test @@ -96,7 +99,7 @@ void testConstructorWithNegativeRangeStart() { IllegalArgumentException.class, () -> { BlockKey invalidBlockKey = new BlockKey(objectKey, new Range(-1, TEST_DATA.length())); - new Block(invalidBlockKey, 0, mockIndexCache, mockMetrics, READ_TIMEOUT); + new Block(invalidBlockKey, 0, mockIndexCache, mockMetrics, READ_TIMEOUT, RETRY_COUNT); }); } @@ -106,13 +109,13 @@ void testConstructorWithNegativeRangeEnd() { IllegalArgumentException.class, () -> { BlockKey blockKey = new BlockKey(objectKey, new Range(0, -1)); - new Block(blockKey, 0, mockIndexCache, mockMetrics, READ_TIMEOUT); + new Block(blockKey, 0, mockIndexCache, mockMetrics, READ_TIMEOUT, RETRY_COUNT); }); } @Test void testSetDataAndIsDataReady() { - Block block = new Block(blockKey, 0, mockIndexCache, mockMetrics, READ_TIMEOUT); + Block block = new Block(blockKey, 0, mockIndexCache, mockMetrics, READ_TIMEOUT, RETRY_COUNT); assertFalse(block.isDataReady()); @@ -125,7 +128,7 @@ void testSetDataAndIsDataReady() { @Test void testReadSingleByteAfterDataSet() throws IOException { - Block block = new Block(blockKey, 0, mockIndexCache, mockMetrics, READ_TIMEOUT); + Block block = new Block(blockKey, 0, mockIndexCache, mockMetrics, READ_TIMEOUT, RETRY_COUNT); block.setData(TEST_DATA_BYTES); int result = block.read(0); @@ -136,7 +139,7 @@ void testReadSingleByteAfterDataSet() throws IOException { @Test void testReadSingleByteAtDifferentPositions() throws IOException { - Block block = new Block(blockKey, 0, mockIndexCache, mockMetrics, READ_TIMEOUT); + Block block = new Block(blockKey, 0, mockIndexCache, mockMetrics, READ_TIMEOUT, RETRY_COUNT); block.setData(TEST_DATA_BYTES); assertEquals(116, block.read(0)); // 't' @@ -152,7 +155,7 @@ void testReadSingleByteAtDifferentPositions() throws IOException { @Test void testReadSingleByteWithNegativePosition() { - Block block = new Block(blockKey, 0, mockIndexCache, mockMetrics, READ_TIMEOUT); + Block block = new Block(blockKey, 0, mockIndexCache, mockMetrics, READ_TIMEOUT, RETRY_COUNT); block.setData(TEST_DATA_BYTES); assertThrows(IllegalArgumentException.class, () -> block.read(-1)); @@ -160,7 +163,7 @@ void testReadSingleByteWithNegativePosition() { @Test void testReadBufferAfterDataSet() throws IOException { - Block block = new Block(blockKey, 0, mockIndexCache, mockMetrics, READ_TIMEOUT); + Block block = new Block(blockKey, 0, mockIndexCache, mockMetrics, READ_TIMEOUT, RETRY_COUNT); block.setData(TEST_DATA_BYTES); byte[] buffer = new byte[4]; @@ -173,7 +176,7 @@ void testReadBufferAfterDataSet() throws IOException { @Test void testReadBufferAtDifferentPositions() throws IOException { - Block block = new Block(blockKey, 0, mockIndexCache, mockMetrics, READ_TIMEOUT); + Block block = new Block(blockKey, 0, mockIndexCache, mockMetrics, READ_TIMEOUT, RETRY_COUNT); block.setData(TEST_DATA_BYTES); byte[] buffer1 = new byte[4]; @@ -189,7 +192,7 @@ void testReadBufferAtDifferentPositions() throws IOException { @Test void testReadBufferPartialRead() throws IOException { - Block block = new Block(blockKey, 0, mockIndexCache, mockMetrics, READ_TIMEOUT); + Block block = new Block(blockKey, 0, mockIndexCache, mockMetrics, READ_TIMEOUT, RETRY_COUNT); block.setData(TEST_DATA_BYTES); byte[] buffer = new byte[10]; @@ -201,7 +204,7 @@ void testReadBufferPartialRead() throws IOException { @Test void testReadBufferWithInvalidParameters() { - Block block = new Block(blockKey, 0, mockIndexCache, mockMetrics, READ_TIMEOUT); + Block block = new Block(blockKey, 0, mockIndexCache, mockMetrics, READ_TIMEOUT, RETRY_COUNT); block.setData(TEST_DATA_BYTES); byte[] buffer = new byte[4]; @@ -214,14 +217,16 @@ void testReadBufferWithInvalidParameters() { @Test void testReadBeforeDataSet() { - Block block = new Block(blockKey, 0, mockIndexCache, mockMetrics, 100); // Short timeout + Block block = + new Block(blockKey, 0, mockIndexCache, mockMetrics, 100, RETRY_COUNT); // Short timeout assertThrows(IOException.class, () -> block.read(0)); } @Test void testReadBufferBeforeDataSet() { - Block block = new Block(blockKey, 0, mockIndexCache, mockMetrics, 100); // Short timeout + Block block = + new Block(blockKey, 0, mockIndexCache, mockMetrics, 100, RETRY_COUNT); // Short timeout byte[] buffer = new byte[4]; assertThrows(IOException.class, () -> block.read(buffer, 0, 4, 0)); @@ -229,7 +234,8 @@ void testReadBufferBeforeDataSet() { @Test void testReadWithTimeout() throws InterruptedException { - Block block = new Block(blockKey, 0, mockIndexCache, mockMetrics, 100); // Short timeout + Block block = + new Block(blockKey, 0, mockIndexCache, mockMetrics, 100, RETRY_COUNT); // Short timeout CountDownLatch latch = new CountDownLatch(1); CompletableFuture readTask = @@ -252,7 +258,7 @@ void testReadWithTimeout() throws InterruptedException { @Test void testConcurrentReadsAfterDataSet() throws InterruptedException, ExecutionException { - Block block = new Block(blockKey, 0, mockIndexCache, mockMetrics, READ_TIMEOUT); + Block block = new Block(blockKey, 0, mockIndexCache, mockMetrics, READ_TIMEOUT, RETRY_COUNT); block.setData(TEST_DATA_BYTES); int numThreads = 10; @@ -284,7 +290,7 @@ void testConcurrentReadsAfterDataSet() throws InterruptedException, ExecutionExc @Test void testCloseReleasesData() throws IOException { - Block block = new Block(blockKey, 0, mockIndexCache, mockMetrics, READ_TIMEOUT); + Block block = new Block(blockKey, 0, mockIndexCache, mockMetrics, READ_TIMEOUT, RETRY_COUNT); block.setData(TEST_DATA_BYTES); assertTrue(block.isDataReady()); @@ -297,7 +303,7 @@ void testCloseReleasesData() throws IOException { @Test void testMultipleSetDataCalls() { - Block block = new Block(blockKey, 0, mockIndexCache, mockMetrics, READ_TIMEOUT); + Block block = new Block(blockKey, 0, mockIndexCache, mockMetrics, READ_TIMEOUT, RETRY_COUNT); block.setData(TEST_DATA_BYTES); assertTrue(block.isDataReady()); @@ -310,9 +316,9 @@ void testMultipleSetDataCalls() { @Test void testGenerationProperty() { - Block block1 = new Block(blockKey, 0, mockIndexCache, mockMetrics, READ_TIMEOUT); - Block block2 = new Block(blockKey, 5, mockIndexCache, mockMetrics, READ_TIMEOUT); - Block block3 = new Block(blockKey, 100, mockIndexCache, mockMetrics, READ_TIMEOUT); + Block block1 = new Block(blockKey, 0, mockIndexCache, mockMetrics, READ_TIMEOUT, RETRY_COUNT); + Block block2 = new Block(blockKey, 5, mockIndexCache, mockMetrics, READ_TIMEOUT, RETRY_COUNT); + Block block3 = new Block(blockKey, 100, mockIndexCache, mockMetrics, READ_TIMEOUT, RETRY_COUNT); assertEquals(0, block1.getGeneration()); assertEquals(5, block2.getGeneration()); @@ -321,7 +327,7 @@ void testGenerationProperty() { @Test void testReadIntoBuffer() throws IOException { - Block block = new Block(blockKey, 0, mockIndexCache, mockMetrics, READ_TIMEOUT); + Block block = new Block(blockKey, 0, mockIndexCache, mockMetrics, READ_TIMEOUT, RETRY_COUNT); block.setData(TEST_DATA_BYTES); byte[] buffer = new byte[20]; @@ -333,15 +339,15 @@ void testReadIntoBuffer() throws IOException { @Test void testReadTimeoutIfDataNeverSet() { - Block block = new Block(blockKey, 0, mockIndexCache, mockMetrics, 100); // 100 ms + Block block = new Block(blockKey, 0, mockIndexCache, mockMetrics, 100, RETRY_COUNT); // 100 ms IOException ex = assertThrows(IOException.class, () -> block.read(0)); - assertTrue(ex.getMessage().contains("Failed to read data")); + assertTrue(ex.getMessage().contains("Error while reading data.")); } @Test void testReadBlocksUntilDataIsReady() throws Exception { - Block block = new Block(blockKey, 0, mockIndexCache, mockMetrics, 1000); + Block block = new Block(blockKey, 0, mockIndexCache, mockMetrics, 1000, RETRY_COUNT); ExecutorService executor = Executors.newSingleThreadExecutor(); Future result = executor.submit(() -> block.read(0)); @@ -356,7 +362,7 @@ void testReadBlocksUntilDataIsReady() throws Exception { @Test void testReadHandlesInterruptedException() throws InterruptedException { - Block block = new Block(blockKey, 0, mockIndexCache, mockMetrics, 5000); + Block block = new Block(blockKey, 0, mockIndexCache, mockMetrics, 500, RETRY_COUNT); Thread testThread = new Thread( @@ -375,4 +381,90 @@ void testReadHandlesInterruptedException() throws InterruptedException { testThread.interrupt(); testThread.join(); } + + @Test + void testRetryStrategyWithTimeout() throws Exception { + // Create a counter to track retry attempts + AtomicInteger attempts = new AtomicInteger(0); + + // Create a custom CountDownLatch to coordinate the test + CountDownLatch attemptLatch = new CountDownLatch(1); + CountDownLatch dataSetLatch = new CountDownLatch(1); + + // Create a block with a short timeout and 2 retries + Block block = new Block(blockKey, 0, mockIndexCache, mockMetrics, 50, 2); + + // Start a thread that will try to read from the block + CompletableFuture future = + CompletableFuture.supplyAsync( + () -> { + try { + // Record the attempt and notify the test thread + attempts.incrementAndGet(); + attemptLatch.countDown(); + + // Try to read - this will trigger retries internally + return block.read(0); + } catch (IOException e) { + throw new CompletionException(e); + } + }); + + // Wait for the read attempt to start + assertTrue(attemptLatch.await(500, TimeUnit.MILLISECONDS)); + + // Wait a bit to ensure at least one retry happens (50ms timeout * 1 retry) + Thread.sleep(75); + + // Now set the data so the next retry will succeed + block.setData(TEST_DATA_BYTES); + dataSetLatch.countDown(); + + // The read should eventually succeed + assertEquals(Byte.toUnsignedInt(TEST_DATA_BYTES[0]), future.join()); + } + + @Test + void testRetryStrategyExhaustsRetries() { + // Create a block with a short timeout and only 1 retry + Block block = new Block(blockKey, 0, mockIndexCache, mockMetrics, 100, 1); + + // Don't set data, so all retries will fail with timeout + + // This should fail after max retries + IOException exception = assertThrows(IOException.class, () -> block.read(0)); + assertTrue(exception.getMessage().contains("timed out")); + } + + @Test + void testRetryStrategyWithMultipleRetries() throws InterruptedException { + // Create a block with a short timeout and multiple retries + CountDownLatch readStarted = new CountDownLatch(1); + + Block block = new Block(blockKey, 0, mockIndexCache, mockMetrics, 100, 3); + + // Start a thread that will try to read from the block + CompletableFuture future = + CompletableFuture.supplyAsync( + () -> { + try { + readStarted.countDown(); + return block.read(0); + } catch (IOException e) { + throw new CompletionException(e); + } + }); + + // Wait for read to start + assertTrue(readStarted.await(500, TimeUnit.MILLISECONDS)); + + // Wait a bit to ensure at least one retry happens + Thread.sleep(250); + + // Now set the data + block.setData(TEST_DATA_BYTES); + + // The read should eventually succeed + assertEquals(Byte.toUnsignedInt(TEST_DATA_BYTES[0]), future.join()); + } } diff --git a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/impl/PhysicalIOImplTest.java b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/impl/PhysicalIOImplTest.java index 5efd9691..46f8149d 100644 --- a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/impl/PhysicalIOImplTest.java +++ b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/impl/PhysicalIOImplTest.java @@ -302,7 +302,7 @@ void testReadTail() throws IOException { @SuppressWarnings("unchecked") @Test - public void test_FailureEvictsObjectsAsExpected() throws IOException { + public void test_FailureEvictsObjectsAsExpected() throws IOException, InterruptedException { AwsServiceException s3Exception = S3Exception.builder() .message("At least one of the pre-conditions you specified did not hold") @@ -325,7 +325,7 @@ public void test_FailureEvictsObjectsAsExpected() throws IOException { .thenReturn(failedFuture); S3SdkObjectClient client = new S3SdkObjectClient(mockS3AsyncClient); PhysicalIOConfiguration configuration = - PhysicalIOConfiguration.builder().blockReadTimeout(2_000).build(); + PhysicalIOConfiguration.builder().blockReadTimeout(200).blockReadRetryCount(2).build(); MetadataStore metadataStore = new MetadataStore(client, TestTelemetry.DEFAULT, configuration, mock(Metrics.class)); @@ -362,7 +362,7 @@ public void test_FailureEvictsObjectsAsExpected_WhenSDKClientGetsStuck() throws .thenReturn(failedFuture); S3SdkObjectClient client = new S3SdkObjectClient(mockS3AsyncClient); PhysicalIOConfiguration configuration = - PhysicalIOConfiguration.builder().blockReadTimeout(2_000).build(); + PhysicalIOConfiguration.builder().blockReadTimeout(200).build(); MetadataStore metadataStore = new MetadataStore(client, TestTelemetry.DEFAULT, configuration, mock(Metrics.class)); diff --git a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/reader/StreamReaderTest.java b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/reader/StreamReaderTest.java index 6074350f..6e9a90da 100644 --- a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/reader/StreamReaderTest.java +++ b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/reader/StreamReaderTest.java @@ -35,6 +35,7 @@ import software.amazon.s3.analyticsaccelerator.common.Metrics; import software.amazon.s3.analyticsaccelerator.common.telemetry.Telemetry; import software.amazon.s3.analyticsaccelerator.common.telemetry.TelemetryConfiguration; +import software.amazon.s3.analyticsaccelerator.io.physical.PhysicalIOConfiguration; import software.amazon.s3.analyticsaccelerator.io.physical.data.Block; import software.amazon.s3.analyticsaccelerator.request.*; import software.amazon.s3.analyticsaccelerator.util.BlockKey; @@ -57,6 +58,7 @@ public class StreamReaderTest { private RequestCallback mockRequestCallback; private Metrics mockMetrics; private Telemetry telemetry; + private PhysicalIOConfiguration configuration; private StreamReader streamReader; @@ -70,6 +72,7 @@ void setUp() { mockMetrics = mock(Metrics.class); mockRequestCallback = mock(RequestCallback.class); telemetry = Telemetry.createTelemetry(TelemetryConfiguration.DEFAULT); + configuration = PhysicalIOConfiguration.DEFAULT; openStreamInfo = OpenStreamInformation.builder().requestCallback(mockRequestCallback).build(); @@ -81,7 +84,8 @@ void setUp() { mockRemoveBlocksFunc, mockMetrics, openStreamInfo, - telemetry); + telemetry, + configuration); } @Test @@ -96,7 +100,8 @@ void test_initializeExceptions() { mockRemoveBlocksFunc, mockMetrics, openStreamInfo, - telemetry)); + telemetry, + configuration)); assertThrows( NullPointerException.class, @@ -108,7 +113,8 @@ void test_initializeExceptions() { mockRemoveBlocksFunc, mockMetrics, openStreamInfo, - telemetry)); + telemetry, + configuration)); assertThrows( NullPointerException.class, @@ -120,7 +126,8 @@ void test_initializeExceptions() { mockRemoveBlocksFunc, mockMetrics, openStreamInfo, - telemetry)); + telemetry, + configuration)); assertThrows( NullPointerException.class, @@ -132,7 +139,8 @@ void test_initializeExceptions() { null, mockMetrics, openStreamInfo, - telemetry)); + telemetry, + configuration)); assertThrows( NullPointerException.class, @@ -144,7 +152,8 @@ void test_initializeExceptions() { mockRemoveBlocksFunc, null, openStreamInfo, - telemetry)); + telemetry, + configuration)); assertThrows( NullPointerException.class, @@ -156,7 +165,8 @@ void test_initializeExceptions() { mockRemoveBlocksFunc, mockMetrics, null, - telemetry)); + telemetry, + configuration)); assertThrows( NullPointerException.class, @@ -168,6 +178,20 @@ void test_initializeExceptions() { mockRemoveBlocksFunc, mockMetrics, openStreamInfo, + null, + configuration)); + + assertThrows( + NullPointerException.class, + () -> + new StreamReader( + mockObjectClient, + mockObjectKey, + mockExecutorService, + mockRemoveBlocksFunc, + mockMetrics, + openStreamInfo, + telemetry, null)); }