Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.io.Closeable;
import java.util.concurrent.CompletableFuture;
import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation;

/** Represents APIs of an Amazon S3 compatible object store */
public interface ObjectClient extends Closeable {
Expand All @@ -25,24 +26,19 @@ public interface ObjectClient extends Closeable {
* Make a headObject request to the object store.
*
* @param headRequest The HEAD request to be sent
* @param openStreamInformation contains stream information
* @return an instance of {@link CompletableFuture} of type {@link ObjectMetadata}
*/
CompletableFuture<ObjectMetadata> headObject(HeadRequest headRequest);
CompletableFuture<ObjectMetadata> headObject(
HeadRequest headRequest, OpenStreamInformation openStreamInformation);

/**
* Make a getObject request to the object store.
*
* @param getRequest The GET request to be sent
* @param openStreamInformation contains stream information
* @return an instance of {@link CompletableFuture} of type {@link ObjectContent}
*/
CompletableFuture<ObjectContent> getObject(GetRequest getRequest);

/**
* Make a getObject request to the object store.
*
* @param getRequest The GET request to be sent
* @param streamContext audit headers to be attached in the request header
* @return an instance of {@link CompletableFuture} of type {@link ObjectContent}
*/
CompletableFuture<ObjectContent> getObject(GetRequest getRequest, StreamContext streamContext);
CompletableFuture<ObjectContent> getObject(
GetRequest getRequest, OpenStreamInformation openStreamInformation);
}
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ LogicalIO createLogicalIO(S3URI s3URI, OpenStreamInformation openStreamInformati
objectMetadataStore,
objectBlobStore,
telemetry,
openStreamInformation.getStreamContext(),
openStreamInformation,
threadPool),
telemetry,
configuration.getLogicalIOConfiguration(),
Expand All @@ -159,7 +159,7 @@ LogicalIO createLogicalIO(S3URI s3URI, OpenStreamInformation openStreamInformati
objectMetadataStore,
objectBlobStore,
telemetry,
openStreamInformation.getStreamContext(),
openStreamInformation,
threadPool),
telemetry,
configuration.getLogicalIOConfiguration());
Expand All @@ -172,7 +172,7 @@ LogicalIO createLogicalIO(S3URI s3URI, OpenStreamInformation openStreamInformati
objectMetadataStore,
objectBlobStore,
telemetry,
openStreamInformation.getStreamContext(),
openStreamInformation,
threadPool),
telemetry);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@
import software.amazon.s3.analyticsaccelerator.io.physical.PhysicalIOConfiguration;
import software.amazon.s3.analyticsaccelerator.request.ObjectClient;
import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata;
import software.amazon.s3.analyticsaccelerator.request.StreamContext;
import software.amazon.s3.analyticsaccelerator.util.MetricComputationUtils;
import software.amazon.s3.analyticsaccelerator.util.MetricKey;
import software.amazon.s3.analyticsaccelerator.util.ObjectKey;
import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation;

/** A BlobStore is a container for Blobs and functions as a data cache. */
@SuppressFBWarnings(
Expand Down Expand Up @@ -120,10 +120,11 @@ void asyncCleanup() {
*
* @param objectKey the etag and S3 URI of the object
* @param metadata the metadata for the object we are computing
* @param streamContext contains audit headers to be attached in the request header
* @param openStreamInformation contains stream information
* @return the blob representing the object from the BlobStore
*/
public Blob get(ObjectKey objectKey, ObjectMetadata metadata, StreamContext streamContext) {
public Blob get(
ObjectKey objectKey, ObjectMetadata metadata, OpenStreamInformation openStreamInformation) {
return blobMap.computeIfAbsent(
objectKey,
uri ->
Expand All @@ -138,7 +139,7 @@ public Blob get(ObjectKey objectKey, ObjectMetadata metadata, StreamContext stre
configuration,
metrics,
indexCache,
streamContext),
openStreamInformation),
telemetry));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import software.amazon.s3.analyticsaccelerator.request.ObjectContent;
import software.amazon.s3.analyticsaccelerator.request.ReadMode;
import software.amazon.s3.analyticsaccelerator.request.Referrer;
import software.amazon.s3.analyticsaccelerator.request.StreamContext;
import software.amazon.s3.analyticsaccelerator.util.*;

/**
Expand All @@ -46,7 +45,7 @@ public class Block implements Closeable {
@Getter private final BlockKey blockKey;
private final Telemetry telemetry;
private final ObjectClient objectClient;
private final StreamContext streamContext;
private final OpenStreamInformation openStreamInformation;
private final ReadMode readMode;
private final Referrer referrer;
private final long readTimeout;
Expand All @@ -72,45 +71,7 @@ public class Block implements Closeable {
* @param readRetryCount Number of retries for block read failure
* @param aggregatingMetrics blobstore metrics
* @param indexCache blobstore index cache
*/
public Block(
@NonNull BlockKey blockKey,
@NonNull ObjectClient objectClient,
@NonNull Telemetry telemetry,
long generation,
@NonNull ReadMode readMode,
long readTimeout,
int readRetryCount,
@NonNull Metrics aggregatingMetrics,
@NonNull BlobStoreIndexCache indexCache)
throws IOException {

this(
blockKey,
objectClient,
telemetry,
generation,
readMode,
readTimeout,
readRetryCount,
aggregatingMetrics,
indexCache,
null);
}

/**
* Constructs a Block data.
*
* @param blockKey the objectkey and range of the object
* @param objectClient the object client to use to interact with the object store
* @param telemetry an instance of {@link Telemetry} to use
* @param generation generation of the block in a sequential read pattern (should be 0 by default)
* @param readMode read mode describing whether this is a sync or async fetch
* @param readTimeout Timeout duration (in milliseconds) for reading a block object from S3
* @param readRetryCount Number of retries for block read failure
* @param aggregatingMetrics blobstore metrics
* @param indexCache blobstore index cache
* @param streamContext contains audit headers to be attached in the request header
* @param openStreamInformation contains stream information
*/
public Block(
@NonNull BlockKey blockKey,
Expand All @@ -122,7 +83,7 @@ public Block(
int readRetryCount,
@NonNull Metrics aggregatingMetrics,
@NonNull BlobStoreIndexCache indexCache,
StreamContext streamContext)
@NonNull OpenStreamInformation openStreamInformation)
throws IOException {

long start = blockKey.getRange().getStart();
Expand All @@ -142,7 +103,7 @@ public Block(
this.telemetry = telemetry;
this.blockKey = blockKey;
this.objectClient = objectClient;
this.streamContext = streamContext;
this.openStreamInformation = openStreamInformation;
this.readMode = readMode;
this.referrer = new Referrer(this.blockKey.getRange().toHttpString(), readMode);
this.readTimeout = readTimeout;
Expand Down Expand Up @@ -176,7 +137,7 @@ private void generateSourceAndData() throws IOException {
.attribute(StreamAttributes.range(this.blockKey.getRange()))
.attribute(StreamAttributes.generation(generation))
.build(),
objectClient.getObject(getRequest, streamContext));
objectClient.getObject(getRequest, openStreamInformation));

// Handle IOExceptions when converting stream to byte array
this.data =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata;
import software.amazon.s3.analyticsaccelerator.request.Range;
import software.amazon.s3.analyticsaccelerator.request.ReadMode;
import software.amazon.s3.analyticsaccelerator.request.StreamContext;
import software.amazon.s3.analyticsaccelerator.util.*;

/** Implements a Block Manager responsible for planning and scheduling reads on a key. */
Expand All @@ -49,43 +48,13 @@ public class BlockManager implements Closeable {
private final IOPlanner ioPlanner;
private final PhysicalIOConfiguration configuration;
private final RangeOptimiser rangeOptimiser;
private StreamContext streamContext;
private OpenStreamInformation openStreamInformation;
private final Metrics aggregatingMetrics;
private final BlobStoreIndexCache indexCache;
private static final String OPERATION_MAKE_RANGE_AVAILABLE = "block.manager.make.range.available";

private static final Logger LOG = LoggerFactory.getLogger(BlockManager.class);

/**
* Constructs a new BlockManager.
*
* @param objectKey the etag and S3 URI of the object
* @param objectClient object client capable of interacting with the underlying object store
* @param telemetry an instance of {@link Telemetry} to use
* @param metadata the metadata for the object we are reading
* @param configuration the physicalIO configuration
* @param aggregatingMetrics factory metrics
* @param indexCache blobstore index cache
*/
public BlockManager(
@NonNull ObjectKey objectKey,
@NonNull ObjectClient objectClient,
@NonNull ObjectMetadata metadata,
@NonNull Telemetry telemetry,
@NonNull PhysicalIOConfiguration configuration,
@NonNull Metrics aggregatingMetrics,
@NonNull BlobStoreIndexCache indexCache) {
this(
objectKey,
objectClient,
metadata,
telemetry,
configuration,
aggregatingMetrics,
indexCache,
null);
}

/**
* Constructs a new BlockManager.
*
Expand All @@ -96,7 +65,7 @@ public BlockManager(
* @param configuration the physicalIO configuration
* @param aggregatingMetrics factory metrics
* @param indexCache blobstore index cache
* @param streamContext contains audit headers to be attached in the request header
* @param openStreamInformation contains stream information
*/
public BlockManager(
@NonNull ObjectKey objectKey,
Expand All @@ -106,7 +75,7 @@ public BlockManager(
@NonNull PhysicalIOConfiguration configuration,
@NonNull Metrics aggregatingMetrics,
@NonNull BlobStoreIndexCache indexCache,
StreamContext streamContext) {
@NonNull OpenStreamInformation openStreamInformation) {
this.objectKey = objectKey;
this.objectClient = objectClient;
this.metadata = metadata;
Expand All @@ -119,7 +88,7 @@ public BlockManager(
this.sequentialReadProgression = new SequentialReadProgression(configuration);
this.ioPlanner = new IOPlanner(blockStore);
this.rangeOptimiser = new RangeOptimiser(configuration);
this.streamContext = streamContext;
this.openStreamInformation = openStreamInformation;

prefetchSmallObject();
}
Expand Down Expand Up @@ -254,7 +223,7 @@ public synchronized void makeRangeAvailable(long pos, long len, ReadMode readMod
this.configuration.getBlockReadRetryCount(),
aggregatingMetrics,
indexCache,
streamContext);
openStreamInformation);
blockStore.add(blockKey, block);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import software.amazon.s3.analyticsaccelerator.request.HeadRequest;
import software.amazon.s3.analyticsaccelerator.request.ObjectClient;
import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata;
import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation;
import software.amazon.s3.analyticsaccelerator.util.S3URI;
import software.amazon.s3.analyticsaccelerator.util.StreamAttributes;

Expand Down Expand Up @@ -79,17 +80,19 @@ protected boolean removeEldestEntry(
* store).
*
* @param s3URI the object to fetch the metadata for
* @param openStreamInformation contains the open stream information
* @return returns the {@link ObjectMetadata}.
* @throws IOException if an I/O error occurs
*/
public ObjectMetadata get(S3URI s3URI) throws IOException {
public ObjectMetadata get(S3URI s3URI, OpenStreamInformation openStreamInformation)
throws IOException {
return telemetry.measureJoinCritical(
() ->
Operation.builder()
.name(OPERATION_METADATA_HEAD_JOIN)
.attribute(StreamAttributes.uri(s3URI))
.build(),
this.asyncGet(s3URI),
this.asyncGet(s3URI, openStreamInformation),
this.configuration.getBlockReadTimeout());
}

Expand All @@ -108,9 +111,11 @@ public boolean evictKey(S3URI s3URI) {
* store).
*
* @param s3URI the object to fetch the metadata for
* @param openStreamInformation contains the open stream information
* @return returns the {@link CompletableFuture} that holds object's metadata.
*/
public synchronized CompletableFuture<ObjectMetadata> asyncGet(S3URI s3URI) {
public synchronized CompletableFuture<ObjectMetadata> asyncGet(
S3URI s3URI, OpenStreamInformation openStreamInformation) {
return this.cache.computeIfAbsent(
s3URI,
uri ->
Expand All @@ -120,7 +125,8 @@ public synchronized CompletableFuture<ObjectMetadata> asyncGet(S3URI s3URI) {
.name(OPERATION_METADATA_HEAD_ASYNC)
.attribute(StreamAttributes.uri(s3URI))
.build(),
objectClient.headObject(HeadRequest.builder().s3Uri(s3URI).build())));
objectClient.headObject(
HeadRequest.builder().s3Uri(s3URI).build(), openStreamInformation)));
}

/**
Expand Down
Loading
Loading