Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,15 @@
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import lombok.Getter;
import lombok.NonNull;
import software.amazon.s3.analyticsaccelerator.common.Metrics;
import software.amazon.s3.analyticsaccelerator.common.Preconditions;
import software.amazon.s3.analyticsaccelerator.retry.RetryPolicy;
import software.amazon.s3.analyticsaccelerator.retry.RetryStrategy;
import software.amazon.s3.analyticsaccelerator.retry.SeekableInputStreamRetryStrategy;
import software.amazon.s3.analyticsaccelerator.util.BlockKey;
import software.amazon.s3.analyticsaccelerator.util.MetricKey;

Expand All @@ -49,6 +53,9 @@ public class Block implements Closeable {
private final BlobStoreIndexCache indexCache;
private final Metrics aggregatingMetrics;
private final long readTimeout;
private final int retryCount;
private final RetryStrategy<Void> retryStrategy;

/**
* A synchronization aid that allows threads to wait until the block's data is available.
*
Expand All @@ -75,13 +82,15 @@ public class Block implements Closeable {
* @param indexCache blobstore index cache
* @param aggregatingMetrics blobstore metrics
* @param readTimeout read timeout in milliseconds
* @param retryCount number of retries
*/
public Block(
@NonNull BlockKey blockKey,
long generation,
@NonNull BlobStoreIndexCache indexCache,
@NonNull Metrics aggregatingMetrics,
long readTimeout) {
long readTimeout,
int retryCount) {
Preconditions.checkArgument(
0 <= generation, "`generation` must be non-negative; was: %s", generation);

Expand All @@ -90,6 +99,27 @@ public Block(
this.indexCache = indexCache;
this.aggregatingMetrics = aggregatingMetrics;
this.readTimeout = readTimeout;
this.retryCount = retryCount;
this.retryStrategy = createRetryStrategy();
}

/**
* Helper to construct retryStrategy
*
* @return a {@link RetryStrategy} to retry when timeouts are set
* @throws RuntimeException if all retries fails and an error occurs
*/
@SuppressWarnings("unchecked")
private RetryStrategy<Void> createRetryStrategy() {
if (this.readTimeout > 0) {
RetryPolicy<Void> timeoutRetries =
RetryPolicy.<Void>builder()
.handle(InterruptedException.class, TimeoutException.class, IOException.class)
.withMaxRetries(this.retryCount)
.build();
return new SeekableInputStreamRetryStrategy<>(timeoutRetries);
}
return new SeekableInputStreamRetryStrategy<>();
}

/**
Expand All @@ -101,7 +131,7 @@ public Block(
*/
public int read(long pos) throws IOException {
Preconditions.checkArgument(0 <= pos, "`pos` must not be negative");
awaitData();
awaitDataWithRetry();
indexCache.recordAccess(this.blockKey);
int contentOffset = posToOffset(pos);
return Byte.toUnsignedInt(this.data[contentOffset]);
Expand All @@ -124,7 +154,7 @@ public int read(byte @NonNull [] buf, int off, int len, long pos) throws IOExcep
Preconditions.checkArgument(0 <= len, "`len` must not be negative");
Preconditions.checkArgument(off < buf.length, "`off` must be less than size of buffer");

awaitData();
awaitDataWithRetry();

indexCache.recordAccess(this.blockKey);
int contentOffset = posToOffset(pos);
Expand Down Expand Up @@ -168,6 +198,14 @@ public void setData(final byte[] data) {
dataReadyLatch.countDown();
}

private void awaitDataWithRetry() throws IOException {
this.retryStrategy.get(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if awaitData is not returning anything, you can use retryStrategy.execute() instead, instead of returning null

() -> {
awaitData();
return null;
});
}

/**
* Waits for the block's data to become available. This method blocks until {@link
* #setData(byte[])} is called.
Expand All @@ -178,13 +216,16 @@ private void awaitData() throws IOException {
try {
if (!dataReadyLatch.await(readTimeout, TimeUnit.MILLISECONDS)) {
throw new IOException(
"Failed to read data", new IOException("Request timed out to fill the block"));
"Error while reading data. Request timed out after "
+ readTimeout
+ "ms while waiting for block data");
}
} catch (InterruptedException e) {
throw new IOException("Failed to read data", e);
throw new IOException("Error while reading data. Read interrupted while waiting for data", e);
}

if (data == null) throw new IOException("Failed to read data");
if (data == null)
throw new IOException("Error while reading data. Block data is null after successful await");
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ public BlockManager(
this::removeBlocks,
aggregatingMetrics,
openStreamInformation,
telemetry);
telemetry,
configuration);
this.sequentialReadProgression = new SequentialReadProgression(configuration);
this.rangeOptimiser = new RangeOptimiser(configuration);

Expand Down Expand Up @@ -213,7 +214,8 @@ public synchronized void makeRangeAvailable(long pos, long len, ReadMode readMod
generation,
this.indexCache,
this.aggregatingMetrics,
this.configuration.getBlockReadTimeout());
this.configuration.getBlockReadTimeout(),
this.configuration.getBlockReadRetryCount());
// Add block to the store for future reference
blockStore.add(block);
blocksToFill.add(block);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,13 +314,30 @@ private void makeReadVectoredRangesAvailable(List<ObjectRange> objectRanges) {
execute(new IOPlan(ranges), ReadMode.READ_VECTORED);
}

/**
* Handles exceptions from operations by evicting keys from caches when appropriate.
*
* @param e The exception to handle
*/
private void handleOperationExceptions(Exception e) {
boolean shouldEvict = false;

// Check for IO errors while reading data
if (e instanceof IOException
&& e.getMessage() != null
&& e.getMessage().contains("Error while reading data.")) {
shouldEvict = true;
}

// Check for precondition failed errors (412)
if (e.getCause() != null
&& e.getCause().getMessage() != null
&& (e.getCause().getMessage().contains("Status Code: 412")
|| e.getCause().getMessage().contains("Error while getting block")
|| e.getCause().getMessage().contains("Failed to read data")
|| e.getCause().getMessage().contains("Request timed out to fill the block"))) {
&& e.getCause().getMessage().contains("Status Code: 412")) {
shouldEvict = true;
}

// Evict keys if needed
if (shouldEvict) {
try {
metadataStore.evictKey(this.objectKey.getS3URI());
} finally {
Expand Down
Loading