Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ testcontainers-testcontainers = "1.20.2"
test-logger="4.0.0"
slf4j="2.0.16"
caffeine = "2.9.3"
failsafe = "3.3.2"


[libraries]
Expand All @@ -28,6 +29,7 @@ netty-nio-client = { module = "software.amazon.awssdk:netty-nio-client", version
parquet-format = { module = "org.apache.parquet:parquet-format", version.ref = "parquetFormat" }
slf4j-api = { module = "org.slf4j:slf4j-api", version.ref = "slf4j"}
caffeine = { module = "com.github.ben-manes.caffeine:caffeine", version.ref = "caffeine" }
failsafe = { module = "dev.failsafe:failsafe", version.ref = "failsafe"}

# Code maintenance and best practices
lombok = { module = "org.projectlombok:lombok", version.ref = "lombok" }
Expand Down
1 change: 1 addition & 0 deletions input-stream/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ dependencies {
implementation(libs.parquet.format)
implementation(libs.slf4j.api)
implementation(libs.caffeine)
implementation(libs.failsafe)

jmhImplementation(libs.s3)
jmhImplementation(libs.s3.transfer.manager)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import software.amazon.s3.analyticsaccelerator.io.logical.impl.ParquetColumnPrefetchStore;
import software.amazon.s3.analyticsaccelerator.io.logical.impl.ParquetLogicalIOImpl;
import software.amazon.s3.analyticsaccelerator.io.logical.impl.SequentialLogicalIOImpl;
import software.amazon.s3.analyticsaccelerator.io.physical.PhysicalIO;
import software.amazon.s3.analyticsaccelerator.io.physical.data.BlobStore;
import software.amazon.s3.analyticsaccelerator.io.physical.data.MetadataStore;
import software.amazon.s3.analyticsaccelerator.io.physical.impl.PhysicalIOImpl;
Expand Down Expand Up @@ -141,44 +142,30 @@ LogicalIO createLogicalIO(S3URI s3URI, OpenStreamInformation openStreamInformati
case PARQUET:
return new ParquetLogicalIOImpl(
s3URI,
new PhysicalIOImpl(
s3URI,
objectMetadataStore,
objectBlobStore,
telemetry,
openStreamInformation,
threadPool),
createPhysicalIO(s3URI, openStreamInformation),
telemetry,
configuration.getLogicalIOConfiguration(),
parquetColumnPrefetchStore);

case SEQUENTIAL:
return new SequentialLogicalIOImpl(
s3URI,
new PhysicalIOImpl(
s3URI,
objectMetadataStore,
objectBlobStore,
telemetry,
openStreamInformation,
threadPool),
createPhysicalIO(s3URI, openStreamInformation),
telemetry,
configuration.getLogicalIOConfiguration());

default:
return new DefaultLogicalIOImpl(
s3URI,
new PhysicalIOImpl(
s3URI,
objectMetadataStore,
objectBlobStore,
telemetry,
openStreamInformation,
threadPool),
telemetry);
s3URI, createPhysicalIO(s3URI, openStreamInformation), telemetry);
}
}

PhysicalIO createPhysicalIO(S3URI s3URI, OpenStreamInformation openStreamInformation)
throws IOException {
return new PhysicalIOImpl(
s3URI, objectMetadataStore, objectBlobStore, telemetry, openStreamInformation, threadPool);
}

void storeObjectMetadata(S3URI s3URI, ObjectMetadata metadata) {
if (metadata != null) {
objectMetadataStore.storeObjectMetadata(s3URI, metadata);
Expand All @@ -188,7 +175,7 @@ void storeObjectMetadata(S3URI s3URI, ObjectMetadata metadata) {
/**
* Closes the factory and underlying resources.
*
* @throws IOException
* @throws IOException if any of the closures fail
*/
@Override
public void close() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,14 @@
import software.amazon.s3.analyticsaccelerator.common.Preconditions;
import software.amazon.s3.analyticsaccelerator.common.telemetry.Operation;
import software.amazon.s3.analyticsaccelerator.common.telemetry.Telemetry;
import software.amazon.s3.analyticsaccelerator.io.physical.PhysicalIOConfiguration;
import software.amazon.s3.analyticsaccelerator.request.GetRequest;
import software.amazon.s3.analyticsaccelerator.request.ObjectClient;
import software.amazon.s3.analyticsaccelerator.request.ObjectContent;
import software.amazon.s3.analyticsaccelerator.request.ReadMode;
import software.amazon.s3.analyticsaccelerator.request.Referrer;
import software.amazon.s3.analyticsaccelerator.retry.RetryExecutor;
import software.amazon.s3.analyticsaccelerator.retry.SeekableInputStreamRetryExecutor;
import software.amazon.s3.analyticsaccelerator.util.*;

/**
Expand All @@ -48,10 +51,10 @@ public class Block implements Closeable {
private final OpenStreamInformation openStreamInformation;
private final ReadMode readMode;
private final Referrer referrer;
private final long readTimeout;
private final int readRetryCount;
@Getter private final long generation;

@Getter private final PhysicalIOConfiguration configuration;
private final int readRetryCount;
private final long readTimeout;
private final Metrics aggregatingMetrics;
private final BlobStoreIndexCache indexCache;
private static final String OPERATION_BLOCK_GET_ASYNC = "block.get.async";
Expand All @@ -67,8 +70,7 @@ public class Block implements Closeable {
* @param telemetry an instance of {@link Telemetry} to use
* @param generation generation of the block in a sequential read pattern (should be 0 by default)
* @param readMode read mode describing whether this is a sync or async fetch
* @param readTimeout Timeout duration (in milliseconds) for reading a block object from S3
* @param readRetryCount Number of retries for block read failure
* @param configuration PhysicalIO Configuration to learn timeout and retry count
* @param aggregatingMetrics blobstore metrics
* @param indexCache blobstore index cache
* @param openStreamInformation contains stream information
Expand All @@ -79,8 +81,7 @@ public Block(
@NonNull Telemetry telemetry,
long generation,
@NonNull ReadMode readMode,
long readTimeout,
int readRetryCount,
@NonNull PhysicalIOConfiguration configuration,
@NonNull Metrics aggregatingMetrics,
@NonNull BlobStoreIndexCache indexCache,
@NonNull OpenStreamInformation openStreamInformation)
Expand All @@ -94,10 +95,6 @@ public Block(
Preconditions.checkArgument(0 <= end, "`end` must be non-negative; was: %s", end);
Preconditions.checkArgument(
start <= end, "`start` must be less than `end`; %s is not less than %s", start, end);
Preconditions.checkArgument(
0 < readTimeout, "`readTimeout` must be greater than 0; was %s", readTimeout);
Preconditions.checkArgument(
0 < readRetryCount, "`readRetryCount` must be greater than 0; was %s", readRetryCount);

this.generation = generation;
this.telemetry = telemetry;
Expand All @@ -106,80 +103,61 @@ public Block(
this.openStreamInformation = openStreamInformation;
this.readMode = readMode;
this.referrer = new Referrer(this.blockKey.getRange().toHttpString(), readMode);
this.readTimeout = readTimeout;
this.readRetryCount = readRetryCount;
this.aggregatingMetrics = aggregatingMetrics;
this.indexCache = indexCache;
generateSourceAndData();
this.configuration = configuration;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this variable ever used?
think line 111 and 109-110 use class variable vs constructor variable in different ways maybe just remove this config?

this.readRetryCount = configuration.getBlockReadRetryCount();
this.readTimeout = configuration.getBlockReadTimeout();
RetryExecutor<Void> retryStrategy = new SeekableInputStreamRetryExecutor<>(this.configuration);
retryStrategy.executeWithRetry(this::generateSourceAndData);
}

/** Method to help construct source and data */
private void generateSourceAndData() throws IOException {
GetRequest getRequest =
GetRequest.builder()
.s3Uri(this.blockKey.getObjectKey().getS3URI())
.range(this.blockKey.getRange())
.etag(this.blockKey.getObjectKey().getEtag())
.referrer(referrer)
.build();

int retries = 0;
while (retries < this.readRetryCount) {
try {
GetRequest getRequest =
GetRequest.builder()
.s3Uri(this.blockKey.getObjectKey().getS3URI())
.range(this.blockKey.getRange())
.etag(this.blockKey.getObjectKey().getEtag())
.referrer(referrer)
.build();

openStreamInformation.getRequestCallback().onGetRequest();

this.source =
this.telemetry.measureCritical(
() ->
Operation.builder()
.name(OPERATION_BLOCK_GET_ASYNC)
.attribute(StreamAttributes.uri(this.blockKey.getObjectKey().getS3URI()))
.attribute(StreamAttributes.etag(this.blockKey.getObjectKey().getEtag()))
.attribute(StreamAttributes.range(this.blockKey.getRange()))
.attribute(StreamAttributes.generation(generation))
.build(),
() -> {
this.aggregatingMetrics.add(MetricKey.GET_REQUEST_COUNT, 1);
return objectClient.getObject(getRequest, openStreamInformation);
});

// Handle IOExceptions when converting stream to byte array
this.data =
this.source.thenApply(
objectContent -> {
try {
byte[] bytes =
StreamUtils.toByteArray(
objectContent,
this.blockKey.getObjectKey(),
this.blockKey.getRange(),
this.readTimeout);
int blockRange = blockKey.getRange().getLength();
this.aggregatingMetrics.add(MetricKey.MEMORY_USAGE, blockRange);
this.indexCache.put(blockKey, blockRange);
return bytes;
} catch (IOException | TimeoutException e) {
throw new RuntimeException(
"Error while converting InputStream to byte array", e);
}
});
openStreamInformation.getRequestCallback().onGetRequest();

return; // Successfully generated source and data, exit loop
} catch (RuntimeException e) {
retries++;
LOG.debug(
"Retry {}/{} - Failed to fetch block data due to: {}",
retries,
this.readRetryCount,
e.getMessage());
this.source =
this.telemetry.measureCritical(
() ->
Operation.builder()
.name(OPERATION_BLOCK_GET_ASYNC)
.attribute(StreamAttributes.uri(this.blockKey.getObjectKey().getS3URI()))
.attribute(StreamAttributes.etag(this.blockKey.getObjectKey().getEtag()))
.attribute(StreamAttributes.range(this.blockKey.getRange()))
.attribute(StreamAttributes.generation(generation))
.build(),
() -> {
this.aggregatingMetrics.add(MetricKey.GET_REQUEST_COUNT, 1);
return objectClient.getObject(getRequest, openStreamInformation);
});

if (retries >= this.readRetryCount) {
LOG.error("Max retries reached. Unable to fetch block data.");
throw new IOException("Failed to fetch block data after retries", e);
}
}
}
// Handle IOExceptions when converting stream to byte array
this.data =
this.source.thenApply(
objectContent -> {
try {
byte[] bytes =
StreamUtils.toByteArray(
objectContent,
this.blockKey.getObjectKey(),
this.blockKey.getRange(),
this.readTimeout);
int blockRange = blockKey.getRange().getLength();
this.aggregatingMetrics.add(MetricKey.MEMORY_USAGE, blockRange);
this.indexCache.put(blockKey, blockRange);
return bytes;
} catch (IOException | TimeoutException e) {
throw new RuntimeException("Error while converting InputStream to byte array", e);
}
});
}

/** @return if data is loaded */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,7 @@ public synchronized void makeRangeAvailable(long pos, long len, ReadMode readMod
telemetry,
generation,
readMode,
this.configuration.getBlockReadTimeout(),
this.configuration.getBlockReadRetryCount(),
this.configuration,
aggregatingMetrics,
indexCache,
openStreamInformation);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.s3.analyticsaccelerator.retry;

import java.io.IOException;

/**
* Interface for executing operations with retry logic. We do not expect consumers of AAL to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't the one design include consumers having access to this?

* instantiate an implementation of this interface themselves. This is for AAL's internal use only.
*
* <p>This interface provides a contract for executing operations that may fail and need to be
* retried according to a configured retry policy. Implementations should handle the retry logic
* internally and only throw exceptions when all retry attempts have been exhausted.
*
* @param <R> the result type of operations executed by this retry executor
*/
public interface RetryExecutor<R> {
/**
* Executes a runnable with retry logic according to the configured retry policy.
*
* <p>The operation will be retried according to the retry policy configuration, including maximum
* retry attempts, delays between retries, and which exceptions should trigger retries. If all
* retry attempts are exhausted, the final exception will be wrapped in an IOException and thrown.
*
* @param runnable the operation to execute, which may throw IOException
* @throws IOException if the operation fails after all retry attempts are exhausted
* @throws NullPointerException if runnable is null
*/
void executeWithRetry(IORunnable runnable) throws IOException;

/**
* Executes a supplier with retry logic according to the configured retry policy.
*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these 2 lines are basically saying the same thing about using the retry policy maybe just merge them?

* <p>The operation will be retried according to the retry policy configuration, including maximum
* retry attempts, delays between retries, and which exceptions should trigger retries. If all
* retry attempts are exhausted, the final exception will be wrapped in an IOException and thrown.
*
* @param supplier the operation to execute, returns <R>, may throw IOException
* @return R result of the supplier.
* @throws IOException if the operation fails due to an I/O error
*/
R getWithRetry(IOSupplier<R> supplier) throws IOException;

/** Functional interface for operations that can throw IOException. */
@FunctionalInterface
interface IORunnable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

think this interface and the interface below can be handy in the future should we move them to their own classes.

/**
* Runs the operation.
*
* <p>This method should contain the logic that needs to be executed with retry support. The
* operation should be idempotent as it may be executed multiple times in case of failures.
*
* @throws IOException if the operation fails due to an I/O error
*/
void run() throws IOException;
}

/** Functional interface for operations that can throw IOException. */
@FunctionalInterface
interface IOSupplier<R> {
/**
* Runs the operation.
*
* <p>This method should contain the logic that needs to be executed with retry support. The
* operation should be idempotent as it may be executed multiple times in case of failures.
*
* @return R result of the supplier.
* @throws IOException if the operation fails due to an I/O error
*/
R get() throws IOException;
}
}
Loading
Loading