diff --git a/common/src/main/java/software/amazon/s3/analyticsaccelerator/request/ObjectClient.java b/common/src/main/java/software/amazon/s3/analyticsaccelerator/request/ObjectClient.java index 93d11916..13ed5393 100644 --- a/common/src/main/java/software/amazon/s3/analyticsaccelerator/request/ObjectClient.java +++ b/common/src/main/java/software/amazon/s3/analyticsaccelerator/request/ObjectClient.java @@ -17,6 +17,7 @@ import java.io.Closeable; import java.util.concurrent.CompletableFuture; +import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation; /** Represents APIs of an Amazon S3 compatible object store */ public interface ObjectClient extends Closeable { @@ -25,24 +26,19 @@ public interface ObjectClient extends Closeable { * Make a headObject request to the object store. * * @param headRequest The HEAD request to be sent + * @param openStreamInformation contains stream information * @return an instance of {@link CompletableFuture} of type {@link ObjectMetadata} */ - CompletableFuture headObject(HeadRequest headRequest); + CompletableFuture headObject( + HeadRequest headRequest, OpenStreamInformation openStreamInformation); /** * Make a getObject request to the object store. * * @param getRequest The GET request to be sent + * @param openStreamInformation contains stream information * @return an instance of {@link CompletableFuture} of type {@link ObjectContent} */ - CompletableFuture getObject(GetRequest getRequest); - - /** - * Make a getObject request to the object store. - * - * @param getRequest The GET request to be sent - * @param streamContext audit headers to be attached in the request header - * @return an instance of {@link CompletableFuture} of type {@link ObjectContent} - */ - CompletableFuture getObject(GetRequest getRequest, StreamContext streamContext); + CompletableFuture getObject( + GetRequest getRequest, OpenStreamInformation openStreamInformation); } diff --git a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamFactory.java b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamFactory.java index 13dab7a5..6b31cd9c 100644 --- a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamFactory.java +++ b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamFactory.java @@ -145,7 +145,7 @@ LogicalIO createLogicalIO(S3URI s3URI, OpenStreamInformation openStreamInformati objectMetadataStore, objectBlobStore, telemetry, - openStreamInformation.getStreamContext(), + openStreamInformation, threadPool), telemetry, configuration.getLogicalIOConfiguration(), @@ -159,7 +159,7 @@ LogicalIO createLogicalIO(S3URI s3URI, OpenStreamInformation openStreamInformati objectMetadataStore, objectBlobStore, telemetry, - openStreamInformation.getStreamContext(), + openStreamInformation, threadPool), telemetry, configuration.getLogicalIOConfiguration()); @@ -172,7 +172,7 @@ LogicalIO createLogicalIO(S3URI s3URI, OpenStreamInformation openStreamInformati objectMetadataStore, objectBlobStore, telemetry, - openStreamInformation.getStreamContext(), + openStreamInformation, threadPool), telemetry); } diff --git a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlobStore.java b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlobStore.java index 57514b27..318eb9ba 100644 --- a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlobStore.java +++ b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlobStore.java @@ -33,10 +33,10 @@ import software.amazon.s3.analyticsaccelerator.io.physical.PhysicalIOConfiguration; import software.amazon.s3.analyticsaccelerator.request.ObjectClient; import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata; -import software.amazon.s3.analyticsaccelerator.request.StreamContext; import software.amazon.s3.analyticsaccelerator.util.MetricComputationUtils; import software.amazon.s3.analyticsaccelerator.util.MetricKey; import software.amazon.s3.analyticsaccelerator.util.ObjectKey; +import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation; /** A BlobStore is a container for Blobs and functions as a data cache. */ @SuppressFBWarnings( @@ -120,10 +120,11 @@ void asyncCleanup() { * * @param objectKey the etag and S3 URI of the object * @param metadata the metadata for the object we are computing - * @param streamContext contains audit headers to be attached in the request header + * @param openStreamInformation contains stream information * @return the blob representing the object from the BlobStore */ - public Blob get(ObjectKey objectKey, ObjectMetadata metadata, StreamContext streamContext) { + public Blob get( + ObjectKey objectKey, ObjectMetadata metadata, OpenStreamInformation openStreamInformation) { return blobMap.computeIfAbsent( objectKey, uri -> @@ -138,7 +139,7 @@ public Blob get(ObjectKey objectKey, ObjectMetadata metadata, StreamContext stre configuration, metrics, indexCache, - streamContext), + openStreamInformation), telemetry)); } diff --git a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/Block.java b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/Block.java index 92d4b336..aa02c776 100644 --- a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/Block.java +++ b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/Block.java @@ -33,7 +33,6 @@ import software.amazon.s3.analyticsaccelerator.request.ObjectContent; import software.amazon.s3.analyticsaccelerator.request.ReadMode; import software.amazon.s3.analyticsaccelerator.request.Referrer; -import software.amazon.s3.analyticsaccelerator.request.StreamContext; import software.amazon.s3.analyticsaccelerator.util.*; /** @@ -46,7 +45,7 @@ public class Block implements Closeable { @Getter private final BlockKey blockKey; private final Telemetry telemetry; private final ObjectClient objectClient; - private final StreamContext streamContext; + private final OpenStreamInformation openStreamInformation; private final ReadMode readMode; private final Referrer referrer; private final long readTimeout; @@ -72,45 +71,7 @@ public class Block implements Closeable { * @param readRetryCount Number of retries for block read failure * @param aggregatingMetrics blobstore metrics * @param indexCache blobstore index cache - */ - public Block( - @NonNull BlockKey blockKey, - @NonNull ObjectClient objectClient, - @NonNull Telemetry telemetry, - long generation, - @NonNull ReadMode readMode, - long readTimeout, - int readRetryCount, - @NonNull Metrics aggregatingMetrics, - @NonNull BlobStoreIndexCache indexCache) - throws IOException { - - this( - blockKey, - objectClient, - telemetry, - generation, - readMode, - readTimeout, - readRetryCount, - aggregatingMetrics, - indexCache, - null); - } - - /** - * Constructs a Block data. - * - * @param blockKey the objectkey and range of the object - * @param objectClient the object client to use to interact with the object store - * @param telemetry an instance of {@link Telemetry} to use - * @param generation generation of the block in a sequential read pattern (should be 0 by default) - * @param readMode read mode describing whether this is a sync or async fetch - * @param readTimeout Timeout duration (in milliseconds) for reading a block object from S3 - * @param readRetryCount Number of retries for block read failure - * @param aggregatingMetrics blobstore metrics - * @param indexCache blobstore index cache - * @param streamContext contains audit headers to be attached in the request header + * @param openStreamInformation contains stream information */ public Block( @NonNull BlockKey blockKey, @@ -122,7 +83,7 @@ public Block( int readRetryCount, @NonNull Metrics aggregatingMetrics, @NonNull BlobStoreIndexCache indexCache, - StreamContext streamContext) + @NonNull OpenStreamInformation openStreamInformation) throws IOException { long start = blockKey.getRange().getStart(); @@ -142,7 +103,7 @@ public Block( this.telemetry = telemetry; this.blockKey = blockKey; this.objectClient = objectClient; - this.streamContext = streamContext; + this.openStreamInformation = openStreamInformation; this.readMode = readMode; this.referrer = new Referrer(this.blockKey.getRange().toHttpString(), readMode); this.readTimeout = readTimeout; @@ -176,7 +137,7 @@ private void generateSourceAndData() throws IOException { .attribute(StreamAttributes.range(this.blockKey.getRange())) .attribute(StreamAttributes.generation(generation)) .build(), - objectClient.getObject(getRequest, streamContext)); + objectClient.getObject(getRequest, openStreamInformation)); // Handle IOExceptions when converting stream to byte array this.data = diff --git a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlockManager.java b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlockManager.java index cd298dee..8b2d4bc2 100644 --- a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlockManager.java +++ b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlockManager.java @@ -34,7 +34,6 @@ import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata; import software.amazon.s3.analyticsaccelerator.request.Range; import software.amazon.s3.analyticsaccelerator.request.ReadMode; -import software.amazon.s3.analyticsaccelerator.request.StreamContext; import software.amazon.s3.analyticsaccelerator.util.*; /** Implements a Block Manager responsible for planning and scheduling reads on a key. */ @@ -49,43 +48,13 @@ public class BlockManager implements Closeable { private final IOPlanner ioPlanner; private final PhysicalIOConfiguration configuration; private final RangeOptimiser rangeOptimiser; - private StreamContext streamContext; + private OpenStreamInformation openStreamInformation; private final Metrics aggregatingMetrics; private final BlobStoreIndexCache indexCache; private static final String OPERATION_MAKE_RANGE_AVAILABLE = "block.manager.make.range.available"; private static final Logger LOG = LoggerFactory.getLogger(BlockManager.class); - /** - * Constructs a new BlockManager. - * - * @param objectKey the etag and S3 URI of the object - * @param objectClient object client capable of interacting with the underlying object store - * @param telemetry an instance of {@link Telemetry} to use - * @param metadata the metadata for the object we are reading - * @param configuration the physicalIO configuration - * @param aggregatingMetrics factory metrics - * @param indexCache blobstore index cache - */ - public BlockManager( - @NonNull ObjectKey objectKey, - @NonNull ObjectClient objectClient, - @NonNull ObjectMetadata metadata, - @NonNull Telemetry telemetry, - @NonNull PhysicalIOConfiguration configuration, - @NonNull Metrics aggregatingMetrics, - @NonNull BlobStoreIndexCache indexCache) { - this( - objectKey, - objectClient, - metadata, - telemetry, - configuration, - aggregatingMetrics, - indexCache, - null); - } - /** * Constructs a new BlockManager. * @@ -96,7 +65,7 @@ public BlockManager( * @param configuration the physicalIO configuration * @param aggregatingMetrics factory metrics * @param indexCache blobstore index cache - * @param streamContext contains audit headers to be attached in the request header + * @param openStreamInformation contains stream information */ public BlockManager( @NonNull ObjectKey objectKey, @@ -106,7 +75,7 @@ public BlockManager( @NonNull PhysicalIOConfiguration configuration, @NonNull Metrics aggregatingMetrics, @NonNull BlobStoreIndexCache indexCache, - StreamContext streamContext) { + @NonNull OpenStreamInformation openStreamInformation) { this.objectKey = objectKey; this.objectClient = objectClient; this.metadata = metadata; @@ -119,7 +88,7 @@ public BlockManager( this.sequentialReadProgression = new SequentialReadProgression(configuration); this.ioPlanner = new IOPlanner(blockStore); this.rangeOptimiser = new RangeOptimiser(configuration); - this.streamContext = streamContext; + this.openStreamInformation = openStreamInformation; prefetchSmallObject(); } @@ -254,7 +223,7 @@ public synchronized void makeRangeAvailable(long pos, long len, ReadMode readMod this.configuration.getBlockReadRetryCount(), aggregatingMetrics, indexCache, - streamContext); + openStreamInformation); blockStore.add(blockKey, block); } }); diff --git a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/MetadataStore.java b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/MetadataStore.java index f4350194..4249eed8 100644 --- a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/MetadataStore.java +++ b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/MetadataStore.java @@ -31,6 +31,7 @@ import software.amazon.s3.analyticsaccelerator.request.HeadRequest; import software.amazon.s3.analyticsaccelerator.request.ObjectClient; import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata; +import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation; import software.amazon.s3.analyticsaccelerator.util.S3URI; import software.amazon.s3.analyticsaccelerator.util.StreamAttributes; @@ -79,17 +80,19 @@ protected boolean removeEldestEntry( * store). * * @param s3URI the object to fetch the metadata for + * @param openStreamInformation contains the open stream information * @return returns the {@link ObjectMetadata}. * @throws IOException if an I/O error occurs */ - public ObjectMetadata get(S3URI s3URI) throws IOException { + public ObjectMetadata get(S3URI s3URI, OpenStreamInformation openStreamInformation) + throws IOException { return telemetry.measureJoinCritical( () -> Operation.builder() .name(OPERATION_METADATA_HEAD_JOIN) .attribute(StreamAttributes.uri(s3URI)) .build(), - this.asyncGet(s3URI), + this.asyncGet(s3URI, openStreamInformation), this.configuration.getBlockReadTimeout()); } @@ -108,9 +111,11 @@ public boolean evictKey(S3URI s3URI) { * store). * * @param s3URI the object to fetch the metadata for + * @param openStreamInformation contains the open stream information * @return returns the {@link CompletableFuture} that holds object's metadata. */ - public synchronized CompletableFuture asyncGet(S3URI s3URI) { + public synchronized CompletableFuture asyncGet( + S3URI s3URI, OpenStreamInformation openStreamInformation) { return this.cache.computeIfAbsent( s3URI, uri -> @@ -120,7 +125,8 @@ public synchronized CompletableFuture asyncGet(S3URI s3URI) { .name(OPERATION_METADATA_HEAD_ASYNC) .attribute(StreamAttributes.uri(s3URI)) .build(), - objectClient.headObject(HeadRequest.builder().s3Uri(s3URI).build()))); + objectClient.headObject( + HeadRequest.builder().s3Uri(s3URI).build(), openStreamInformation))); } /** diff --git a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/impl/PhysicalIOImpl.java b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/impl/PhysicalIOImpl.java index 06e099e8..9639f377 100644 --- a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/impl/PhysicalIOImpl.java +++ b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/impl/PhysicalIOImpl.java @@ -35,8 +35,8 @@ import software.amazon.s3.analyticsaccelerator.io.physical.plan.IOPlan; import software.amazon.s3.analyticsaccelerator.io.physical.plan.IOPlanExecution; import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata; -import software.amazon.s3.analyticsaccelerator.request.StreamContext; import software.amazon.s3.analyticsaccelerator.util.ObjectKey; +import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation; import software.amazon.s3.analyticsaccelerator.util.S3URI; import software.amazon.s3.analyticsaccelerator.util.StreamAttributes; @@ -45,7 +45,7 @@ public class PhysicalIOImpl implements PhysicalIO { private MetadataStore metadataStore; private BlobStore blobStore; private final Telemetry telemetry; - private final StreamContext streamContext; + private final OpenStreamInformation openStreamInformation; private ObjectKey objectKey; private final ObjectMetadata metadata; private final ExecutorService threadPool; @@ -67,6 +67,7 @@ public class PhysicalIOImpl implements PhysicalIO { * @param metadataStore a metadata cache * @param blobStore a data cache * @param telemetry The {@link Telemetry} to use to report measurements. + * @param openStreamInformation contains stream information * @param threadPool Thread pool for async operations */ public PhysicalIOImpl( @@ -74,34 +75,14 @@ public PhysicalIOImpl( @NonNull MetadataStore metadataStore, @NonNull BlobStore blobStore, @NonNull Telemetry telemetry, - @NonNull ExecutorService threadPool) - throws IOException { - this(s3URI, metadataStore, blobStore, telemetry, null, threadPool); - } - - /** - * Construct a new instance of PhysicalIOV2. - * - * @param s3URI the S3 URI of the object - * @param metadataStore a metadata cache - * @param blobStore a data cache - * @param telemetry The {@link Telemetry} to use to report measurements. - * @param streamContext contains audit headers to be attached in the request header - * @param threadPool Thread pool for async operations - */ - public PhysicalIOImpl( - @NonNull S3URI s3URI, - @NonNull MetadataStore metadataStore, - @NonNull BlobStore blobStore, - @NonNull Telemetry telemetry, - StreamContext streamContext, + @NonNull OpenStreamInformation openStreamInformation, @NonNull ExecutorService threadPool) throws IOException { this.metadataStore = metadataStore; this.blobStore = blobStore; this.telemetry = telemetry; - this.streamContext = streamContext; - this.metadata = this.metadataStore.get(s3URI); + this.openStreamInformation = openStreamInformation; + this.metadata = this.metadataStore.get(s3URI, openStreamInformation); this.objectKey = ObjectKey.builder().s3URI(s3URI).etag(metadata.getEtag()).build(); this.threadPool = threadPool; } @@ -140,7 +121,7 @@ public int read(long pos) throws IOException { StreamAttributes.physicalIORelativeTimestamp( System.nanoTime() - physicalIOBirth)) .build(), - () -> blobStore.get(this.objectKey, this.metadata, streamContext).read(pos)); + () -> blobStore.get(this.objectKey, this.metadata, openStreamInformation).read(pos)); } catch (Exception e) { handleOperationExceptions(e); throw e; @@ -177,7 +158,10 @@ public int read(byte[] buf, int off, int len, long pos) throws IOException { StreamAttributes.physicalIORelativeTimestamp( System.nanoTime() - physicalIOBirth)) .build(), - () -> blobStore.get(objectKey, this.metadata, streamContext).read(buf, off, len, pos)); + () -> + blobStore + .get(objectKey, this.metadata, openStreamInformation) + .read(buf, off, len, pos)); } catch (Exception e) { handleOperationExceptions(e); throw e; @@ -213,7 +197,7 @@ public int readTail(byte[] buf, int off, int len) throws IOException { .build(), () -> blobStore - .get(objectKey, this.metadata, streamContext) + .get(objectKey, this.metadata, openStreamInformation) .read(buf, off, len, contentLength - len)); } catch (Exception e) { handleOperationExceptions(e); @@ -240,7 +224,7 @@ public IOPlanExecution execute(IOPlan ioPlan) { StreamAttributes.physicalIORelativeTimestamp( System.nanoTime() - physicalIOBirth)) .build(), - () -> blobStore.get(objectKey, this.metadata, streamContext).execute(ioPlan)); + () -> blobStore.get(objectKey, this.metadata, openStreamInformation).execute(ioPlan)); } @SuppressFBWarnings( @@ -250,7 +234,7 @@ public IOPlanExecution execute(IOPlan ioPlan) { @Override public void readVectored(List objectRanges, IntFunction allocate) throws IOException { - Blob blob = blobStore.get(objectKey, this.metadata, streamContext); + Blob blob = blobStore.get(objectKey, this.metadata, openStreamInformation); for (ObjectRange objectRange : objectRanges) { ByteBuffer buffer = allocate.apply(objectRange.getLength()); diff --git a/input-stream/src/referenceTest/java/software/amazon/s3/analyticsaccelerator/property/InMemoryS3SeekableInputStream.java b/input-stream/src/referenceTest/java/software/amazon/s3/analyticsaccelerator/property/InMemoryS3SeekableInputStream.java index 0ebe41ba..58b65240 100644 --- a/input-stream/src/referenceTest/java/software/amazon/s3/analyticsaccelerator/property/InMemoryS3SeekableInputStream.java +++ b/input-stream/src/referenceTest/java/software/amazon/s3/analyticsaccelerator/property/InMemoryS3SeekableInputStream.java @@ -34,6 +34,7 @@ import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration; import software.amazon.s3.analyticsaccelerator.common.ObjectRange; import software.amazon.s3.analyticsaccelerator.request.*; +import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation; import software.amazon.s3.analyticsaccelerator.util.S3URI; public class InMemoryS3SeekableInputStream extends SeekableInputStream { @@ -80,19 +81,15 @@ public InMemoryObjectClient(int size) { } @Override - public CompletableFuture headObject(HeadRequest headRequest) { + public CompletableFuture headObject( + HeadRequest headRequest, OpenStreamInformation openStreamInformation) { return CompletableFuture.completedFuture( ObjectMetadata.builder().contentLength(size).etag(etag).build()); } - @Override - public CompletableFuture getObject(GetRequest getRequest) { - return getObject(getRequest, null); - } - @Override public CompletableFuture getObject( - GetRequest getRequest, StreamContext streamContext) { + GetRequest getRequest, OpenStreamInformation openStreamInformation) { int start = 0; int end = size - 1; diff --git a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamFactoryTest.java b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamFactoryTest.java index f588b273..c324d705 100644 --- a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamFactoryTest.java +++ b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamFactoryTest.java @@ -120,10 +120,16 @@ void testCreateStreamWithContentLengthAndEtag() throws IOException { assertNotNull(inputStream); assertEquals( CONTENT_LENGTH, - s3SeekableInputStreamFactory.getObjectMetadataStore().get(s3URI).getContentLength()); + s3SeekableInputStreamFactory + .getObjectMetadataStore() + .get(s3URI, OpenStreamInformation.DEFAULT) + .getContentLength()); assertEquals( objectMetadata.getEtag(), - s3SeekableInputStreamFactory.getObjectMetadataStore().get(s3URI).getEtag()); + s3SeekableInputStreamFactory + .getObjectMetadataStore() + .get(s3URI, OpenStreamInformation.DEFAULT) + .getEtag()); } @Test diff --git a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamTest.java b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamTest.java index 58c526d6..e2040235 100644 --- a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamTest.java +++ b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamTest.java @@ -41,6 +41,7 @@ import software.amazon.s3.analyticsaccelerator.io.physical.impl.PhysicalIOImpl; import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata; import software.amazon.s3.analyticsaccelerator.util.FakeObjectClient; +import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation; import software.amazon.s3.analyticsaccelerator.util.S3URI; @SuppressFBWarnings( @@ -369,7 +370,12 @@ void testMultiThreadUsage() throws IOException, InterruptedException { try { PhysicalIO physicalIO = new PhysicalIOImpl( - s3URI, metadataStore, blobStore, TestTelemetry.DEFAULT, executorService); + s3URI, + metadataStore, + blobStore, + TestTelemetry.DEFAULT, + OpenStreamInformation.DEFAULT, + executorService); LogicalIO logicalIO = new ParquetLogicalIOImpl( TEST_OBJECT, @@ -465,7 +471,12 @@ private S3SeekableInputStream getTestStreamWithContent(String content, S3URI s3U new ParquetLogicalIOImpl( s3URI, new PhysicalIOImpl( - s3URI, metadataStore, blobStore, TestTelemetry.DEFAULT, executorService), + s3URI, + metadataStore, + blobStore, + TestTelemetry.DEFAULT, + OpenStreamInformation.DEFAULT, + executorService), TestTelemetry.DEFAULT, LogicalIOConfiguration.DEFAULT, new ParquetColumnPrefetchStore(LogicalIOConfiguration.DEFAULT)), diff --git a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamTestBase.java b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamTestBase.java index 28e927cc..d55115a4 100644 --- a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamTestBase.java +++ b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamTestBase.java @@ -30,6 +30,7 @@ import software.amazon.s3.analyticsaccelerator.io.physical.data.MetadataStore; import software.amazon.s3.analyticsaccelerator.io.physical.impl.PhysicalIOImpl; import software.amazon.s3.analyticsaccelerator.util.FakeObjectClient; +import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation; import software.amazon.s3.analyticsaccelerator.util.S3URI; public class S3SeekableInputStreamTestBase { @@ -57,7 +58,12 @@ public class S3SeekableInputStreamTestBase { new ParquetLogicalIOImpl( TEST_OBJECT, new PhysicalIOImpl( - TEST_OBJECT, metadataStore, blobStore, TestTelemetry.DEFAULT, executorService), + TEST_OBJECT, + metadataStore, + blobStore, + TestTelemetry.DEFAULT, + OpenStreamInformation.DEFAULT, + executorService), TestTelemetry.DEFAULT, logicalIOConfiguration, new ParquetColumnPrefetchStore(logicalIOConfiguration)); diff --git a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/logical/impl/ParquetLogicalIOImplTest.java b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/logical/impl/ParquetLogicalIOImplTest.java index 279cf462..6a123b0e 100644 --- a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/logical/impl/ParquetLogicalIOImplTest.java +++ b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/logical/impl/ParquetLogicalIOImplTest.java @@ -34,6 +34,7 @@ import software.amazon.s3.analyticsaccelerator.request.HeadRequest; import software.amazon.s3.analyticsaccelerator.request.ObjectClient; import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata; +import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation; import software.amazon.s3.analyticsaccelerator.util.PrefetchMode; import software.amazon.s3.analyticsaccelerator.util.S3URI; @@ -132,7 +133,7 @@ void testCloseDependencies() throws IOException { @Test void testMetadaWithZeroContentLength() throws IOException { ObjectClient mockClient = mock(ObjectClient.class); - when(mockClient.headObject(any(HeadRequest.class))) + when(mockClient.headObject(any(HeadRequest.class), any(OpenStreamInformation.class))) .thenReturn( CompletableFuture.completedFuture( ObjectMetadata.builder().contentLength(0).etag("random").build())); @@ -147,7 +148,12 @@ void testMetadaWithZeroContentLength() throws IOException { mock(Metrics.class)); PhysicalIOImpl physicalIO = new PhysicalIOImpl( - s3URI, metadataStore, blobStore, TestTelemetry.DEFAULT, mock(ExecutorService.class)); + s3URI, + metadataStore, + blobStore, + TestTelemetry.DEFAULT, + OpenStreamInformation.DEFAULT, + mock(ExecutorService.class)); assertDoesNotThrow( () -> new ParquetLogicalIOImpl( diff --git a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/logical/parquet/ParquetUtilsTest.java b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/logical/parquet/ParquetUtilsTest.java index 2d5dd1e6..0fa4add1 100644 --- a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/logical/parquet/ParquetUtilsTest.java +++ b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/logical/parquet/ParquetUtilsTest.java @@ -126,4 +126,50 @@ void testMergeRanges() { assertTrue(expectedRanges.containsAll(ParquetUtils.mergeRanges(ranges))); } + + @Test + void testConstructRowGroupsToPrefetch() { + List rowGroups = ParquetUtils.constructRowGroupsToPrefetch(); + assertEquals(1, rowGroups.size()); + assertEquals(0, rowGroups.get(0)); + } + + @Test + void testMergeRangesWithSingleRange() { + List ranges = new ArrayList<>(); + ranges.add(new Range(100, 200)); + + List mergedRanges = ParquetUtils.mergeRanges(ranges); + assertEquals(1, mergedRanges.size()); + assertEquals(100, mergedRanges.get(0).getStart()); + assertEquals(200, mergedRanges.get(0).getEnd()); + } + + @Test + void testMergeRangesWithNonConsecutiveRanges() { + List ranges = new ArrayList<>(); + ranges.add(new Range(100, 200)); + ranges.add(new Range(300, 400)); + ranges.add(new Range(600, 700)); + + List mergedRanges = ParquetUtils.mergeRanges(ranges); + assertEquals(3, mergedRanges.size()); + assertEquals(100, mergedRanges.get(0).getStart()); + assertEquals(200, mergedRanges.get(0).getEnd()); + assertEquals(300, mergedRanges.get(1).getStart()); + assertEquals(400, mergedRanges.get(1).getEnd()); + assertEquals(600, mergedRanges.get(2).getStart()); + assertEquals(700, mergedRanges.get(2).getEnd()); + } + + @Test + void testGetFileTailPrefetchRangesSmallFile() { + long smallFileSize = ONE_MB; + List ranges = + ParquetUtils.getFileTailPrefetchRanges(LogicalIOConfiguration.DEFAULT, 0, smallFileSize); + + assertEquals(1, ranges.size()); + assertEquals(0, ranges.get(0).getStart()); + assertEquals(smallFileSize - 1, ranges.get(0).getEnd()); + } } diff --git a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlobStoreTest.java b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlobStoreTest.java index 2d61a846..64440d23 100644 --- a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlobStoreTest.java +++ b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlobStoreTest.java @@ -40,7 +40,6 @@ import software.amazon.s3.analyticsaccelerator.request.ObjectClient; import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata; import software.amazon.s3.analyticsaccelerator.request.Range; -import software.amazon.s3.analyticsaccelerator.request.StreamContext; import software.amazon.s3.analyticsaccelerator.util.*; import software.amazon.s3.analyticsaccelerator.util.BlockKey; import software.amazon.s3.analyticsaccelerator.util.FakeObjectClient; @@ -52,7 +51,7 @@ justification = "We mean to pass nulls to checks") public class BlobStoreTest { private static final String TEST_DATA = "test-data"; - private static final String ETAG = "random"; + private static final String ETAG = "RANDOM"; private static final ObjectMetadata objectMetadata = ObjectMetadata.builder().contentLength(TEST_DATA.length()).etag(ETAG).build(); @@ -71,7 +70,7 @@ public class BlobStoreTest { void setUp() throws IOException { ObjectClient objectClient = new FakeObjectClient("test-data"); MetadataStore metadataStore = mock(MetadataStore.class); - when(metadataStore.get(any())) + when(metadataStore.get(any(), any())) .thenReturn(ObjectMetadata.builder().contentLength(TEST_DATA.length()).etag(ETAG).build()); Metrics metrics = new Metrics(); Map configMap = new HashMap<>(); @@ -152,7 +151,7 @@ void testCreateBoundaries() { @Test public void testGetReturnsReadableBlob() throws IOException { // When: a Blob is asked for - Blob blob = blobStore.get(objectKey, objectMetadata, mock(StreamContext.class)); + Blob blob = blobStore.get(objectKey, objectMetadata, mock(OpenStreamInformation.class)); // Then: byte[] b = new byte[TEST_DATA.length()]; @@ -164,7 +163,7 @@ public void testGetReturnsReadableBlob() throws IOException { @Test void testEvictKey_ExistingKey() { // Setup - blobStore.get(objectKey, objectMetadata, mock(StreamContext.class)); + blobStore.get(objectKey, objectMetadata, mock(OpenStreamInformation.class)); // Test boolean result = blobStore.evictKey(objectKey); @@ -190,7 +189,7 @@ void testMemoryUsageTracking() throws IOException { assertEquals(0, blobStore.getMetrics().get(MetricKey.MEMORY_USAGE)); // When: Reading data which causes memory allocation - Blob blob = blobStore.get(objectKey, objectMetadata, mock(StreamContext.class)); + Blob blob = blobStore.get(objectKey, objectMetadata, mock(OpenStreamInformation.class)); byte[] b = new byte[TEST_DATA.length()]; blob.read(b, 0, b.length, 0); @@ -205,7 +204,7 @@ void testCacheHitsAndMisses() throws IOException { assertEquals(0, blobStore.getMetrics().get(MetricKey.CACHE_HIT)); assertEquals(0, blobStore.getMetrics().get(MetricKey.CACHE_MISS)); - Blob blob = blobStore.get(objectKey, objectMetadata, mock(StreamContext.class)); + Blob blob = blobStore.get(objectKey, objectMetadata, mock(OpenStreamInformation.class)); byte[] b = new byte[TEST_DATA.length()]; blob.read(b, 0, b.length, 0); @@ -235,8 +234,8 @@ void testMemoryUsageAfterEviction() throws IOException, InterruptedException { ObjectKey key3 = ObjectKey.builder().s3URI(S3URI.of("test", "test3")).etag(ETAG).build(); // When: Add blobs up to capacity - Blob blob1 = blobStore.get(key1, objectMetadata, mock(StreamContext.class)); - Blob blob2 = blobStore.get(key2, objectMetadata, mock(StreamContext.class)); + Blob blob1 = blobStore.get(key1, objectMetadata, mock(OpenStreamInformation.class)); + Blob blob2 = blobStore.get(key2, objectMetadata, mock(OpenStreamInformation.class)); // Force data loading byte[] data = new byte[TEST_DATA.length()]; @@ -247,7 +246,7 @@ void testMemoryUsageAfterEviction() throws IOException, InterruptedException { long initialMemoryUsage = blobStore.getMetrics().get(MetricKey.MEMORY_USAGE); // Then: Adding one more blob should trigger eviction - Blob blob3 = blobStore.get(key3, objectMetadata, mock(StreamContext.class)); + Blob blob3 = blobStore.get(key3, objectMetadata, mock(OpenStreamInformation.class)); blob3.read(data, 0, data.length, 0); Thread.sleep(10); @@ -282,7 +281,8 @@ void testConcurrentMemoryUpdates() throws Exception { ObjectMetadata threadMetadata = ObjectMetadata.builder().contentLength(bytesPerThread).etag(ETAG).build(); - Blob blob = blobStore.get(threadKey, threadMetadata, mock(StreamContext.class)); + Blob blob = + blobStore.get(threadKey, threadMetadata, mock(OpenStreamInformation.class)); byte[] b = new byte[bytesPerThread]; blob.read(b, 0, b.length, 0); } catch (IOException e) { @@ -305,8 +305,8 @@ void testClose() { ObjectKey key1 = ObjectKey.builder().s3URI(S3URI.of("test", "test1")).etag(ETAG).build(); ObjectKey key2 = ObjectKey.builder().s3URI(S3URI.of("test", "test2")).etag(ETAG).build(); - Blob blob1 = blobStore.get(key1, objectMetadata, mock(StreamContext.class)); - Blob blob2 = blobStore.get(key2, objectMetadata, mock(StreamContext.class)); + Blob blob1 = blobStore.get(key1, objectMetadata, mock(OpenStreamInformation.class)); + Blob blob2 = blobStore.get(key2, objectMetadata, mock(OpenStreamInformation.class)); byte[] data = new byte[TEST_DATA.length()]; try { @@ -343,10 +343,10 @@ void testBlobCount() { ObjectKey key2 = ObjectKey.builder().s3URI(S3URI.of("test", "test2")).etag(ETAG).build(); // Get blobs (which adds them to the map) - blobStore.get(key1, objectMetadata, mock(StreamContext.class)); + blobStore.get(key1, objectMetadata, mock(OpenStreamInformation.class)); assertEquals(1, blobStore.blobCount(), "Blob count should be 1 after adding first blob"); - blobStore.get(key2, objectMetadata, mock(StreamContext.class)); + blobStore.get(key2, objectMetadata, mock(OpenStreamInformation.class)); assertEquals(2, blobStore.blobCount(), "Blob count should be 2 after adding second blob"); // Test count after eviction @@ -366,7 +366,7 @@ void testCleanupNotTriggeredBelowCapacity() { // Create a blob to potentially clean ObjectKey key = ObjectKey.builder().s3URI(S3URI.of("test", "test1")).etag(ETAG).build(); - mockBlobStore.get(key, objectMetadata, mock(StreamContext.class)); + mockBlobStore.get(key, objectMetadata, mock(OpenStreamInformation.class)); // Attempt cleanup mockBlobStore.scheduleCleanupIfNotRunning(); @@ -384,7 +384,7 @@ void testCleanupTriggeredAboveCapacity() throws IOException { // Create and load a blob ObjectKey key = ObjectKey.builder().s3URI(S3URI.of("test", "test1")).etag(ETAG).build(); - mockBlobStore.get(key, objectMetadata, mock(StreamContext.class)); + mockBlobStore.get(key, objectMetadata, mock(OpenStreamInformation.class)); // Trigger cleanup mockBlobStore.scheduleCleanupIfNotRunning(); @@ -468,7 +468,7 @@ void testCleanupWithException() { doThrow(new RuntimeException("Cleanup failed")).when(mockBlob).asyncCleanup(); // Add mock blob to store - mockBlobStore.get(key, objectMetadata, mock(StreamContext.class)); + mockBlobStore.get(key, objectMetadata, mock(OpenStreamInformation.class)); // Attempt cleanup - should handle exception gracefully assertDoesNotThrow(() -> mockBlobStore.scheduleCleanupIfNotRunning()); @@ -516,7 +516,8 @@ void testConcurrentCleanupAndBlobOperations() throws InterruptedException { .build(); // Perform operations while cleanup might be running - Blob blob = blobStore.get(threadKey, objectMetadata, mock(StreamContext.class)); + Blob blob = + blobStore.get(threadKey, objectMetadata, mock(OpenStreamInformation.class)); byte[] data = new byte[TEST_DATA.length()]; blob.read(data, 0, data.length, 0); @@ -555,7 +556,7 @@ void testRapidEvictionAndRetrieval() throws IOException { for (int i = 0; i < 10000; i++) { ObjectKey key = ObjectKey.builder().s3URI(S3URI.of("test", "test" + i)).etag(ETAG).build(); keys.add(key); - blobs.add(blobStore.get(key, objectMetadata, mock(StreamContext.class))); + blobs.add(blobStore.get(key, objectMetadata, mock(OpenStreamInformation.class))); } // Force data loading for all blobs @@ -567,7 +568,7 @@ void testRapidEvictionAndRetrieval() throws IOException { // Evict keys while simultaneously retrieving them for (int i = 0; i < keys.size(); i++) { blobStore.evictKey(keys.get(i)); - Blob newBlob = blobStore.get(keys.get(i), objectMetadata, mock(StreamContext.class)); + Blob newBlob = blobStore.get(keys.get(i), objectMetadata, mock(OpenStreamInformation.class)); byte[] newData = new byte[TEST_DATA.length()]; newBlob.read(newData, 0, newData.length, 0); assertEquals(TEST_DATA, new String(newData, StandardCharsets.UTF_8)); @@ -582,7 +583,7 @@ void testCleanupUnderMemoryPressure() throws IOException, InterruptedException { // Create enough blobs to exceed the configured capacity for (int i = 0; i < 10000; i++) { ObjectKey key = ObjectKey.builder().s3URI(S3URI.of("test", "test" + i)).etag(ETAG).build(); - blobs.add(blobStore.get(key, objectMetadata, mock(StreamContext.class))); + blobs.add(blobStore.get(key, objectMetadata, mock(OpenStreamInformation.class))); } // Force data loading to trigger memory pressure @@ -628,7 +629,8 @@ void testConcurrentBlobRetrievalAndCleanup() throws InterruptedException { // Perform multiple get operations for (int j = 0; j < 5; j++) { Blob blob = - blobStore.get(threadKey, objectMetadata, mock(StreamContext.class)); + blobStore.get( + threadKey, objectMetadata, mock(OpenStreamInformation.class)); byte[] data = new byte[TEST_DATA.length()]; blob.read(data, 0, data.length, 0); @@ -678,7 +680,7 @@ void testCleanupWithActiveReads() throws InterruptedException { ObjectKey key = ObjectKey.builder().s3URI(S3URI.of("test", "testLongRead")).etag(ETAG).build(); - Blob blob = blobStore.get(key, objectMetadata, mock(StreamContext.class)); + Blob blob = blobStore.get(key, objectMetadata, mock(OpenStreamInformation.class)); byte[] data = new byte[TEST_DATA.length()]; // Signal that read has started diff --git a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlobTest.java b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlobTest.java index 6c198529..19c842f8 100644 --- a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlobTest.java +++ b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlobTest.java @@ -35,6 +35,7 @@ import software.amazon.s3.analyticsaccelerator.request.ReadMode; import software.amazon.s3.analyticsaccelerator.util.FakeObjectClient; import software.amazon.s3.analyticsaccelerator.util.ObjectKey; +import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation; import software.amazon.s3.analyticsaccelerator.util.S3URI; @SuppressFBWarnings( @@ -42,7 +43,7 @@ justification = "We mean to pass nulls to checks") public class BlobTest { private static final S3URI TEST_URI = S3URI.of("foo", "bar"); - private static final String ETAG = "RandomString"; + private static final String ETAG = "RANDOM"; private static final ObjectKey objectKey = ObjectKey.builder().s3URI(TEST_URI).etag(ETAG).build(); private static final String TEST_DATA = "test-data-0123456789"; private static final int OBJECT_SIZE = 100; @@ -177,7 +178,8 @@ private Blob getTestBlob(String data) { TestTelemetry.DEFAULT, PhysicalIOConfiguration.DEFAULT, mock(Metrics.class), - mock(BlobStoreIndexCache.class)); + mock(BlobStoreIndexCache.class), + OpenStreamInformation.DEFAULT); return new Blob(objectKey, mockMetadataStore, blockManager, TestTelemetry.DEFAULT); } diff --git a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlockManagerTest.java b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlockManagerTest.java index 01587fa8..ebcf4f3f 100644 --- a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlockManagerTest.java +++ b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlockManagerTest.java @@ -38,6 +38,7 @@ import software.amazon.s3.analyticsaccelerator.io.physical.PhysicalIOConfiguration; import software.amazon.s3.analyticsaccelerator.request.*; import software.amazon.s3.analyticsaccelerator.util.ObjectKey; +import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation; import software.amazon.s3.analyticsaccelerator.util.S3URI; @SuppressFBWarnings( @@ -61,7 +62,7 @@ void testConstructorInitialization() { PhysicalIOConfiguration configuration = mock(PhysicalIOConfiguration.class); Metrics aggregatingMetrics = new Metrics(); BlobStoreIndexCache indexCache = mock(BlobStoreIndexCache.class); - StreamContext streamContext = mock(StreamContext.class); + OpenStreamInformation openStreamInformation = mock(OpenStreamInformation.class); // Act BlockManager blockManager = @@ -73,7 +74,7 @@ void testConstructorInitialization() { configuration, aggregatingMetrics, indexCache, - streamContext); + openStreamInformation); // Assert assertNotNull(blockManager, "BlockManager should not be null"); @@ -91,7 +92,8 @@ void testCreateBoundaries() { mock(Telemetry.class), mock(PhysicalIOConfiguration.class), mock(Metrics.class), - mock(BlobStoreIndexCache.class))); + mock(BlobStoreIndexCache.class), + OpenStreamInformation.DEFAULT)); assertThrows( NullPointerException.class, () -> @@ -102,7 +104,8 @@ void testCreateBoundaries() { mock(Telemetry.class), mock(PhysicalIOConfiguration.class), mock(Metrics.class), - mock(BlobStoreIndexCache.class))); + mock(BlobStoreIndexCache.class), + OpenStreamInformation.DEFAULT)); assertThrows( NullPointerException.class, () -> @@ -113,7 +116,8 @@ void testCreateBoundaries() { mock(Telemetry.class), mock(PhysicalIOConfiguration.class), mock(Metrics.class), - mock(BlobStoreIndexCache.class))); + mock(BlobStoreIndexCache.class), + OpenStreamInformation.DEFAULT)); assertThrows( NullPointerException.class, () -> @@ -124,7 +128,8 @@ void testCreateBoundaries() { null, mock(PhysicalIOConfiguration.class), mock(Metrics.class), - mock(BlobStoreIndexCache.class))); + mock(BlobStoreIndexCache.class), + OpenStreamInformation.DEFAULT)); assertThrows( NullPointerException.class, () -> @@ -135,7 +140,8 @@ void testCreateBoundaries() { mock(Telemetry.class), null, mock(Metrics.class), - mock(BlobStoreIndexCache.class))); + mock(BlobStoreIndexCache.class), + OpenStreamInformation.DEFAULT)); } @Test @@ -375,7 +381,8 @@ private BlockManager getTestBlockManager( TestTelemetry.DEFAULT, configuration, mock(Metrics.class), - mock(BlobStoreIndexCache.class)); + mock(BlobStoreIndexCache.class), + OpenStreamInformation.DEFAULT); } @Test diff --git a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlockStoreTest.java b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlockStoreTest.java index 438a19a9..6c1c8050 100644 --- a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlockStoreTest.java +++ b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlockStoreTest.java @@ -39,7 +39,7 @@ public class BlockStoreTest { private static final S3URI TEST_URI = S3URI.of("foo", "bar"); - private static final String ETAG = "RandomString"; + private static final String ETAG = "RANDOM"; private static final ObjectKey objectKey = ObjectKey.builder().s3URI(TEST_URI).etag(ETAG).build(); private static final int OBJECT_SIZE = 100; private static final long DEFAULT_READ_TIMEOUT = 120_000; @@ -70,7 +70,8 @@ public void test__blockStore__getBlockAfterAddBlock() { DEFAULT_READ_TIMEOUT, DEFAULT_READ_RETRY_COUNT, mock(Metrics.class), - mock(BlobStoreIndexCache.class))); + mock(BlobStoreIndexCache.class), + OpenStreamInformation.DEFAULT)); // Then: getBlock can retrieve the same block Optional b = blockStore.getBlock(4); @@ -106,7 +107,8 @@ public void test__blockStore__findNextMissingByteCorrect() throws IOException { DEFAULT_READ_TIMEOUT, DEFAULT_READ_RETRY_COUNT, mock(Metrics.class), - mock(BlobStoreIndexCache.class))); + mock(BlobStoreIndexCache.class), + OpenStreamInformation.DEFAULT)); blockStore.add( blockKey2, new Block( @@ -118,7 +120,8 @@ public void test__blockStore__findNextMissingByteCorrect() throws IOException { DEFAULT_READ_TIMEOUT, DEFAULT_READ_RETRY_COUNT, mock(Metrics.class), - mock(BlobStoreIndexCache.class))); + mock(BlobStoreIndexCache.class), + OpenStreamInformation.DEFAULT)); blockStore.add( blockKey3, new Block( @@ -130,7 +133,8 @@ public void test__blockStore__findNextMissingByteCorrect() throws IOException { DEFAULT_READ_TIMEOUT, DEFAULT_READ_RETRY_COUNT, mock(Metrics.class), - mock(BlobStoreIndexCache.class))); + mock(BlobStoreIndexCache.class), + OpenStreamInformation.DEFAULT)); // When & Then: we query for the next missing byte, the result is correct assertEquals(OptionalLong.of(0), blockStore.findNextMissingByte(0)); @@ -169,7 +173,8 @@ public void test__blockStore__findNextAvailableByteCorrect() { DEFAULT_READ_TIMEOUT, DEFAULT_READ_RETRY_COUNT, mock(Metrics.class), - mock(BlobStoreIndexCache.class))); + mock(BlobStoreIndexCache.class), + OpenStreamInformation.DEFAULT)); blockStore.add( blockKey2, new Block( @@ -181,7 +186,8 @@ public void test__blockStore__findNextAvailableByteCorrect() { DEFAULT_READ_TIMEOUT, DEFAULT_READ_RETRY_COUNT, mock(Metrics.class), - mock(BlobStoreIndexCache.class))); + mock(BlobStoreIndexCache.class), + OpenStreamInformation.DEFAULT)); blockStore.add( blockKey3, new Block( @@ -193,7 +199,8 @@ public void test__blockStore__findNextAvailableByteCorrect() { DEFAULT_READ_TIMEOUT, DEFAULT_READ_RETRY_COUNT, mock(Metrics.class), - mock(BlobStoreIndexCache.class))); + mock(BlobStoreIndexCache.class), + OpenStreamInformation.DEFAULT)); // When & Then: we query for the next available byte, the result is correct assertEquals(OptionalLong.of(2), blockStore.findNextLoadedByte(0)); diff --git a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlockTest.java b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlockTest.java index e9d9ed3d..67b025a0 100644 --- a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlockTest.java +++ b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlockTest.java @@ -39,7 +39,7 @@ @SuppressWarnings("unchecked") public class BlockTest { private static final S3URI TEST_URI = S3URI.of("foo", "bar"); - private static final String ETAG = "RandomString"; + private static final String ETAG = "RANDOM"; private static final ObjectKey objectKey = ObjectKey.builder().s3URI(TEST_URI).etag(ETAG).build(); private static final long DEFAULT_READ_TIMEOUT = 120_000; private static final int DEFAULT_READ_RETRY_COUNT = 20; @@ -61,7 +61,8 @@ public void testConstructor() throws IOException { DEFAULT_READ_TIMEOUT, DEFAULT_READ_RETRY_COUNT, mock(Metrics.class), - mock(BlobStoreIndexCache.class)); + mock(BlobStoreIndexCache.class), + OpenStreamInformation.DEFAULT); assertNotNull(block); } @@ -88,7 +89,8 @@ void testReadWithValidPosition() throws IOException { DEFAULT_READ_TIMEOUT, DEFAULT_READ_RETRY_COUNT, mockMetrics, - mockIndexCache); + mockIndexCache, + OpenStreamInformation.DEFAULT); // Test when data is not in cache when(mockIndexCache.contains(blockKey)).thenReturn(false); @@ -124,7 +126,8 @@ public void testSingleByteReadReturnsCorrectByte() throws IOException { DEFAULT_READ_TIMEOUT, DEFAULT_READ_RETRY_COUNT, mock(Metrics.class), - mock(BlobStoreIndexCache.class)); + mock(BlobStoreIndexCache.class), + OpenStreamInformation.DEFAULT); // When: bytes are requested from the block int r1 = block.read(0); @@ -153,7 +156,8 @@ public void testBufferedReadReturnsCorrectBytes() throws IOException { DEFAULT_READ_TIMEOUT, DEFAULT_READ_RETRY_COUNT, mock(Metrics.class), - mock(BlobStoreIndexCache.class)); + mock(BlobStoreIndexCache.class), + OpenStreamInformation.DEFAULT); // When: bytes are requested from the block byte[] b1 = new byte[4]; @@ -248,7 +252,8 @@ void testBoundaries() { DEFAULT_READ_TIMEOUT, DEFAULT_READ_RETRY_COUNT, mock(Metrics.class), - mock(BlobStoreIndexCache.class))); + mock(BlobStoreIndexCache.class), + OpenStreamInformation.DEFAULT)); assertThrows( IllegalArgumentException.class, () -> @@ -261,7 +266,8 @@ void testBoundaries() { DEFAULT_READ_TIMEOUT, DEFAULT_READ_RETRY_COUNT, mock(Metrics.class), - mock(BlobStoreIndexCache.class))); + mock(BlobStoreIndexCache.class), + OpenStreamInformation.DEFAULT)); assertThrows( IllegalArgumentException.class, () -> @@ -274,7 +280,8 @@ void testBoundaries() { DEFAULT_READ_TIMEOUT, DEFAULT_READ_RETRY_COUNT, mock(Metrics.class), - mock(BlobStoreIndexCache.class))); + mock(BlobStoreIndexCache.class), + OpenStreamInformation.DEFAULT)); assertThrows( IllegalArgumentException.class, () -> @@ -287,7 +294,8 @@ void testBoundaries() { DEFAULT_READ_TIMEOUT, DEFAULT_READ_RETRY_COUNT, mock(Metrics.class), - mock(BlobStoreIndexCache.class))); + mock(BlobStoreIndexCache.class), + OpenStreamInformation.DEFAULT)); assertThrows( IllegalArgumentException.class, () -> @@ -300,7 +308,8 @@ void testBoundaries() { DEFAULT_READ_TIMEOUT, DEFAULT_READ_RETRY_COUNT, mock(Metrics.class), - mock(BlobStoreIndexCache.class))); + mock(BlobStoreIndexCache.class), + OpenStreamInformation.DEFAULT)); } @SneakyThrows @@ -319,7 +328,8 @@ void testReadBoundaries() { DEFAULT_READ_TIMEOUT, DEFAULT_READ_RETRY_COUNT, mock(Metrics.class), - mock(BlobStoreIndexCache.class)); + mock(BlobStoreIndexCache.class), + OpenStreamInformation.DEFAULT); assertThrows(IllegalArgumentException.class, () -> block.read(-10)); assertThrows(NullPointerException.class, () -> block.read(null, 0, 3, 1)); assertThrows(IllegalArgumentException.class, () -> block.read(b, -5, 3, 1)); @@ -342,7 +352,8 @@ void testContains() { DEFAULT_READ_TIMEOUT, DEFAULT_READ_RETRY_COUNT, mock(Metrics.class), - mock(BlobStoreIndexCache.class)); + mock(BlobStoreIndexCache.class), + OpenStreamInformation.DEFAULT); assertTrue(block.contains(0)); assertFalse(block.contains(TEST_DATA.length() + 1)); } @@ -362,7 +373,8 @@ void testContainsBoundaries() { DEFAULT_READ_TIMEOUT, DEFAULT_READ_RETRY_COUNT, mock(Metrics.class), - mock(BlobStoreIndexCache.class)); + mock(BlobStoreIndexCache.class), + OpenStreamInformation.DEFAULT); assertThrows(IllegalArgumentException.class, () -> block.contains(-1)); } @@ -383,7 +395,8 @@ void testReadTimeoutAndRetry() throws IOException { DEFAULT_READ_TIMEOUT, DEFAULT_READ_RETRY_COUNT, mock(Metrics.class), - mock(BlobStoreIndexCache.class)); + mock(BlobStoreIndexCache.class), + OpenStreamInformation.DEFAULT); assertThrows(IOException.class, () -> block.read(4)); } @@ -402,7 +415,8 @@ void testClose() { DEFAULT_READ_TIMEOUT, DEFAULT_READ_RETRY_COUNT, mock(Metrics.class), - mock(BlobStoreIndexCache.class)); + mock(BlobStoreIndexCache.class), + OpenStreamInformation.DEFAULT); block.close(); block.close(); } diff --git a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/data/IOPlannerTest.java b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/data/IOPlannerTest.java index fb62c728..bdda5b95 100644 --- a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/data/IOPlannerTest.java +++ b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/data/IOPlannerTest.java @@ -37,7 +37,7 @@ @SuppressWarnings("unchecked") public class IOPlannerTest { private static final S3URI TEST_URI = S3URI.of("foo", "bar"); - private static final String ETAG = "RandomString"; + private static final String ETAG = "RANDOM"; private static final ObjectKey objectKey = ObjectKey.builder().s3URI(TEST_URI).etag(ETAG).build(); @Test @@ -106,7 +106,8 @@ public void testPlanReadDoesNotDoubleRead() throws IOException { 120_000, 20, mock(Metrics.class), - mock(BlobStoreIndexCache.class))); + mock(BlobStoreIndexCache.class), + OpenStreamInformation.DEFAULT)); IOPlanner ioPlanner = new IOPlanner(blockStore); // When: a read plan is requested for a range (0, 400) diff --git a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/data/MetadataStoreTest.java b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/data/MetadataStoreTest.java index 3face6a8..e5bac702 100644 --- a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/data/MetadataStoreTest.java +++ b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/data/MetadataStoreTest.java @@ -33,6 +33,7 @@ import software.amazon.s3.analyticsaccelerator.request.HeadRequest; import software.amazon.s3.analyticsaccelerator.request.ObjectClient; import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata; +import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation; import software.amazon.s3.analyticsaccelerator.util.S3URI; public class MetadataStoreTest { @@ -42,19 +43,19 @@ public void test__get__cacheWorks() throws IOException { // Given: a MetadataStore with caching turned on ObjectClient objectClient = mock(ObjectClient.class); ObjectMetadata objectMetadata = ObjectMetadata.builder().etag("random").build(); - when(objectClient.headObject(any())) + when(objectClient.headObject(any(), any())) .thenReturn(CompletableFuture.completedFuture(objectMetadata)); MetadataStore metadataStore = new MetadataStore(objectClient, TestTelemetry.DEFAULT, PhysicalIOConfiguration.DEFAULT); S3URI key = S3URI.of("foo", "bar"); // When: get(..) is called multiple times - metadataStore.get(key); - metadataStore.get(key); - metadataStore.get(key); + metadataStore.get(key, OpenStreamInformation.DEFAULT); + metadataStore.get(key, OpenStreamInformation.DEFAULT); + metadataStore.get(key, OpenStreamInformation.DEFAULT); // Then: object store was accessed only once - verify(objectClient, times(1)).headObject(any()); + verify(objectClient, times(1)).headObject(any(), any()); } @Test @@ -68,24 +69,26 @@ public void test__close__closesAllElements() HeadRequest h1 = HeadRequest.builder().s3Uri(S3URI.of("b", "key1")).build(); HeadRequest h2 = HeadRequest.builder().s3Uri(S3URI.of("b", "key2")).build(); + OpenStreamInformation openStreamInformation = OpenStreamInformation.DEFAULT; CompletableFuture future = mock(CompletableFuture.class); when(future.isDone()).thenReturn(false); when(future.cancel(anyBoolean())).thenThrow(new RuntimeException("something horrible")); - when(objectClient.headObject(h1)).thenReturn(future); + when(objectClient.headObject(h1, openStreamInformation)).thenReturn(future); CompletableFuture objectMetadataCompletableFuture = mock(CompletableFuture.class); - when(objectClient.headObject(h2)).thenReturn(objectMetadataCompletableFuture); + when(objectClient.headObject(h2, openStreamInformation)) + .thenReturn(objectMetadataCompletableFuture); MetadataStore metadataStore = new MetadataStore(objectClient, TestTelemetry.DEFAULT, PhysicalIOConfiguration.DEFAULT); // When: MetadataStore is closed - metadataStore.get(S3URI.of("b", "key1")); - metadataStore.get(S3URI.of("b", "key2")); + metadataStore.get(S3URI.of("b", "key1"), openStreamInformation); + metadataStore.get(S3URI.of("b", "key2"), openStreamInformation); metadataStore.close(); // Then: nothing has thrown, all futures were cancelled @@ -96,7 +99,7 @@ public void test__close__closesAllElements() void testEvictKey_ExistingKey() { // Setup ObjectClient objectClient = mock(ObjectClient.class); - when(objectClient.headObject(any())) + when(objectClient.headObject(any(), any())) .thenReturn(CompletableFuture.completedFuture(mock(ObjectMetadata.class))); MetadataStore metadataStore = new MetadataStore(objectClient, TestTelemetry.DEFAULT, PhysicalIOConfiguration.DEFAULT); diff --git a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/impl/PhysicalIOImplTest.java b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/impl/PhysicalIOImplTest.java index 61fcdd7b..cb23a3bb 100644 --- a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/impl/PhysicalIOImplTest.java +++ b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/impl/PhysicalIOImplTest.java @@ -45,10 +45,10 @@ import software.amazon.s3.analyticsaccelerator.io.physical.data.BlobStore; import software.amazon.s3.analyticsaccelerator.io.physical.data.MetadataStore; import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata; -import software.amazon.s3.analyticsaccelerator.request.StreamContext; import software.amazon.s3.analyticsaccelerator.util.FakeObjectClient; import software.amazon.s3.analyticsaccelerator.util.MetricKey; import software.amazon.s3.analyticsaccelerator.util.ObjectKey; +import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation; import software.amazon.s3.analyticsaccelerator.util.S3URI; @SuppressFBWarnings( @@ -71,6 +71,7 @@ void testConstructorThrowsOnNullArgument() { mock(MetadataStore.class), mock(BlobStore.class), TestTelemetry.DEFAULT, + OpenStreamInformation.DEFAULT, executorService); }); @@ -82,7 +83,7 @@ void testConstructorThrowsOnNullArgument() { null, mock(BlobStore.class), TestTelemetry.DEFAULT, - mock(StreamContext.class), + mock(OpenStreamInformation.class), executorService); }); @@ -94,7 +95,7 @@ void testConstructorThrowsOnNullArgument() { mock(MetadataStore.class), null, TestTelemetry.DEFAULT, - mock(StreamContext.class), + mock(OpenStreamInformation.class), executorService); }); @@ -106,7 +107,7 @@ void testConstructorThrowsOnNullArgument() { mock(MetadataStore.class), mock(BlobStore.class), null, - mock(StreamContext.class), + mock(OpenStreamInformation.class), executorService); }); @@ -126,28 +127,48 @@ void testConstructorThrowsOnNullArgument() { NullPointerException.class, () -> { new PhysicalIOImpl( - s3URI, null, mock(BlobStore.class), TestTelemetry.DEFAULT, executorService); + s3URI, + null, + mock(BlobStore.class), + TestTelemetry.DEFAULT, + OpenStreamInformation.DEFAULT, + executorService); }); assertThrows( NullPointerException.class, () -> { new PhysicalIOImpl( - s3URI, mock(MetadataStore.class), null, TestTelemetry.DEFAULT, executorService); + s3URI, + mock(MetadataStore.class), + null, + TestTelemetry.DEFAULT, + OpenStreamInformation.DEFAULT, + executorService); }); assertThrows( NullPointerException.class, () -> { new PhysicalIOImpl( - s3URI, mock(MetadataStore.class), mock(BlobStore.class), null, executorService); + s3URI, + mock(MetadataStore.class), + mock(BlobStore.class), + null, + OpenStreamInformation.DEFAULT, + executorService); }); assertThrows( NullPointerException.class, () -> { new PhysicalIOImpl( - s3URI, mock(MetadataStore.class), mock(BlobStore.class), TestTelemetry.DEFAULT, null); + s3URI, + mock(MetadataStore.class), + mock(BlobStore.class), + TestTelemetry.DEFAULT, + OpenStreamInformation.DEFAULT, + null); }); } @@ -165,7 +186,13 @@ public void test__readSingleByte_isCorrect() throws IOException { PhysicalIOConfiguration.DEFAULT, mock(Metrics.class)); PhysicalIOImpl physicalIOImplV2 = - new PhysicalIOImpl(s3URI, metadataStore, blobStore, TestTelemetry.DEFAULT, executorService); + new PhysicalIOImpl( + s3URI, + metadataStore, + blobStore, + TestTelemetry.DEFAULT, + OpenStreamInformation.DEFAULT, + executorService); // When: we read // Then: returned data is correct @@ -190,7 +217,13 @@ public void test__regression_singleByteStream() throws IOException { PhysicalIOConfiguration.DEFAULT, mock(Metrics.class)); PhysicalIOImpl physicalIOImplV2 = - new PhysicalIOImpl(s3URI, metadataStore, blobStore, TestTelemetry.DEFAULT, executorService); + new PhysicalIOImpl( + s3URI, + metadataStore, + blobStore, + TestTelemetry.DEFAULT, + OpenStreamInformation.DEFAULT, + executorService); // When: we read // Then: returned data is correct @@ -210,7 +243,13 @@ void testReadWithBuffer() throws IOException { PhysicalIOConfiguration.DEFAULT, mock(Metrics.class)); PhysicalIOImpl physicalIOImplV2 = - new PhysicalIOImpl(s3URI, metadataStore, blobStore, TestTelemetry.DEFAULT, executorService); + new PhysicalIOImpl( + s3URI, + metadataStore, + blobStore, + TestTelemetry.DEFAULT, + OpenStreamInformation.DEFAULT, + executorService); byte[] buffer = new byte[5]; assertEquals(5, physicalIOImplV2.read(buffer, 0, 5, 5)); @@ -229,7 +268,13 @@ void testReadTail() throws IOException { PhysicalIOConfiguration.DEFAULT, mock(Metrics.class)); PhysicalIOImpl physicalIOImplV2 = - new PhysicalIOImpl(s3URI, metadataStore, blobStore, TestTelemetry.DEFAULT, executorService); + new PhysicalIOImpl( + s3URI, + metadataStore, + blobStore, + TestTelemetry.DEFAULT, + OpenStreamInformation.DEFAULT, + executorService); byte[] buffer = new byte[5]; assertEquals(5, physicalIOImplV2.readTail(buffer, 0, 5)); assertEquals(1, blobStore.blobCount()); @@ -268,11 +313,17 @@ public void test_FailureEvictsObjectsAsExpected() throws IOException { new BlobStore( client, TestTelemetry.DEFAULT, PhysicalIOConfiguration.DEFAULT, mock(Metrics.class)); PhysicalIOImpl physicalIOImplV2 = - new PhysicalIOImpl(s3URI, metadataStore, blobStore, TestTelemetry.DEFAULT, executorService); + new PhysicalIOImpl( + s3URI, + metadataStore, + blobStore, + TestTelemetry.DEFAULT, + OpenStreamInformation.DEFAULT, + executorService); assertThrows(IOException.class, () -> physicalIOImplV2.read(0)); assertEquals(0, blobStore.blobCount()); - assertThrows(Exception.class, () -> metadataStore.get(s3URI)); + assertThrows(Exception.class, () -> metadataStore.get(s3URI, OpenStreamInformation.DEFAULT)); } @SuppressWarnings("unchecked") @@ -297,11 +348,17 @@ public void test_FailureEvictsObjectsAsExpected_WhenSDKClientGetsStuck() throws new BlobStore( client, TestTelemetry.DEFAULT, PhysicalIOConfiguration.DEFAULT, mock(Metrics.class)); PhysicalIOImpl physicalIOImplV2 = - new PhysicalIOImpl(s3URI, metadataStore, blobStore, TestTelemetry.DEFAULT, executorService); + new PhysicalIOImpl( + s3URI, + metadataStore, + blobStore, + TestTelemetry.DEFAULT, + OpenStreamInformation.DEFAULT, + executorService); assertThrows(IOException.class, () -> physicalIOImplV2.read(0)); assertEquals(0, blobStore.blobCount()); - assertThrows(Exception.class, () -> metadataStore.get(s3URI)); + assertThrows(Exception.class, () -> metadataStore.get(s3URI, OpenStreamInformation.DEFAULT)); } @Test @@ -316,7 +373,13 @@ void testClose_WithoutEviction() throws IOException { new BlobStore( fakeObjectClient, TestTelemetry.DEFAULT, PhysicalIOConfiguration.DEFAULT, metrics); PhysicalIOImpl physicalIO = - new PhysicalIOImpl(s3URI, metadataStore, blobStore, TestTelemetry.DEFAULT, executorService); + new PhysicalIOImpl( + s3URI, + metadataStore, + blobStore, + TestTelemetry.DEFAULT, + OpenStreamInformation.DEFAULT, + executorService); // When: Read data to ensure blob is created byte[] buffer = new byte[4]; @@ -350,7 +413,12 @@ void testCloseWithEviction() throws IOException { PhysicalIOImpl physicalIO = new PhysicalIOImpl( - s3URI, metadataStore, mockBlobStore, TestTelemetry.DEFAULT, executorService); + s3URI, + metadataStore, + mockBlobStore, + TestTelemetry.DEFAULT, + OpenStreamInformation.DEFAULT, + executorService); ObjectKey objectKey = ObjectKey.builder().s3URI(s3URI).etag(fakeObjectClient.getEtag()).build(); // When physicalIO.close(true); @@ -370,7 +438,13 @@ void testPartialRead() throws IOException { new BlobStore( fakeObjectClient, TestTelemetry.DEFAULT, PhysicalIOConfiguration.DEFAULT, metrics); PhysicalIOImpl physicalIO = - new PhysicalIOImpl(s3URI, metadataStore, blobStore, TestTelemetry.DEFAULT, executorService); + new PhysicalIOImpl( + s3URI, + metadataStore, + blobStore, + TestTelemetry.DEFAULT, + OpenStreamInformation.DEFAULT, + executorService); // When: Read partial data byte[] buffer = new byte[4]; @@ -401,7 +475,13 @@ private void readVectored(IntFunction allocate) throws IOException { new BlobStore( fakeObjectClient, TestTelemetry.DEFAULT, PhysicalIOConfiguration.DEFAULT, metrics); PhysicalIOImpl physicalIO = - new PhysicalIOImpl(s3URI, metadataStore, blobStore, TestTelemetry.DEFAULT, executorService); + new PhysicalIOImpl( + s3URI, + metadataStore, + blobStore, + TestTelemetry.DEFAULT, + OpenStreamInformation.DEFAULT, + executorService); List objectRanges = new ArrayList<>(); objectRanges.add(new ObjectRange(new CompletableFuture<>(), 2, 3)); diff --git a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/util/FakeObjectClient.java b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/util/FakeObjectClient.java index 244fdfce..07813d46 100644 --- a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/util/FakeObjectClient.java +++ b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/util/FakeObjectClient.java @@ -55,26 +55,22 @@ public FakeObjectClient(String data) { } @Override - public CompletableFuture headObject(HeadRequest headRequest) { + public CompletableFuture headObject( + HeadRequest headRequest, OpenStreamInformation openStreamInformation) { headRequestCount.incrementAndGet(); return CompletableFuture.completedFuture( ObjectMetadata.builder().contentLength(this.content.length()).etag(this.etag).build()); } @Override - public CompletableFuture getObject(GetRequest getRequest) { + public CompletableFuture getObject( + GetRequest getRequest, OpenStreamInformation openStreamInformation) { if (!getRequest.getEtag().equals(this.etag)) { throw S3Exception.builder() .message("At least one of the pre-conditions you specified did not hold") .statusCode(412) .build(); } - return getObject(getRequest, null); - } - - @Override - public CompletableFuture getObject( - GetRequest getRequest, StreamContext streamContext) { getRequestCount.incrementAndGet(); requestedRanges.add(getRequest.getRange()); return CompletableFuture.completedFuture( diff --git a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/util/FakeStuckObjectClient.java b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/util/FakeStuckObjectClient.java index 3650e6b5..dcf9afac 100644 --- a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/util/FakeStuckObjectClient.java +++ b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/util/FakeStuckObjectClient.java @@ -19,7 +19,6 @@ import java.util.concurrent.TimeoutException; import software.amazon.s3.analyticsaccelerator.request.GetRequest; import software.amazon.s3.analyticsaccelerator.request.ObjectContent; -import software.amazon.s3.analyticsaccelerator.request.StreamContext; public class FakeStuckObjectClient extends FakeObjectClient { @@ -34,7 +33,7 @@ public FakeStuckObjectClient(String data) { @Override public CompletableFuture getObject( - GetRequest getRequest, StreamContext streamContext) { + GetRequest getRequest, OpenStreamInformation openStreamInformation) { CompletableFuture failedFuture = new CompletableFuture<>(); failedFuture.completeExceptionally(new TimeoutException("Request timed out")); return failedFuture; diff --git a/object-client/src/main/java/software/amazon/s3/analyticsaccelerator/S3SdkObjectClient.java b/object-client/src/main/java/software/amazon/s3/analyticsaccelerator/S3SdkObjectClient.java index 10bed072..f677fb2a 100644 --- a/object-client/src/main/java/software/amazon/s3/analyticsaccelerator/S3SdkObjectClient.java +++ b/object-client/src/main/java/software/amazon/s3/analyticsaccelerator/S3SdkObjectClient.java @@ -33,6 +33,7 @@ import software.amazon.s3.analyticsaccelerator.common.telemetry.Telemetry; import software.amazon.s3.analyticsaccelerator.exceptions.ExceptionHandler; import software.amazon.s3.analyticsaccelerator.request.*; +import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation; import software.amazon.s3.analyticsaccelerator.util.S3URI; /** Object client, based on AWS SDK v2 */ @@ -109,7 +110,8 @@ public void close() { } @Override - public CompletableFuture headObject(HeadRequest headRequest) { + public CompletableFuture headObject( + HeadRequest headRequest, OpenStreamInformation openStreamInformation) { HeadObjectRequest.Builder builder = HeadObjectRequest.builder() .bucket(headRequest.getS3Uri().getBucket()) @@ -139,14 +141,9 @@ public CompletableFuture headObject(HeadRequest headRequest) { .exceptionally(handleException(headRequest.getS3Uri())); } - @Override - public CompletableFuture getObject(GetRequest getRequest) { - return getObject(getRequest, null); - } - @Override public CompletableFuture getObject( - GetRequest getRequest, StreamContext streamContext) { + GetRequest getRequest, OpenStreamInformation openStreamInformation) { GetObjectRequest.Builder builder = GetObjectRequest.builder() @@ -158,8 +155,9 @@ public CompletableFuture getObject( builder.range(range); final String referrerHeader; - if (streamContext != null) { - referrerHeader = streamContext.modifyAndBuildReferrerHeader(getRequest); + if (openStreamInformation.getStreamContext() != null) { + referrerHeader = + openStreamInformation.getStreamContext().modifyAndBuildReferrerHeader(getRequest); } else { referrerHeader = getRequest.getReferrer().toString(); } diff --git a/object-client/src/test/java/software/amazon/s3/analyticsaccelerator/S3SdkObjectClientTest.java b/object-client/src/test/java/software/amazon/s3/analyticsaccelerator/S3SdkObjectClientTest.java index 24f3abdc..2fbb04ec 100644 --- a/object-client/src/test/java/software/amazon/s3/analyticsaccelerator/S3SdkObjectClientTest.java +++ b/object-client/src/test/java/software/amazon/s3/analyticsaccelerator/S3SdkObjectClientTest.java @@ -55,6 +55,7 @@ import software.amazon.s3.analyticsaccelerator.request.Range; import software.amazon.s3.analyticsaccelerator.request.ReadMode; import software.amazon.s3.analyticsaccelerator.request.Referrer; +import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation; import software.amazon.s3.analyticsaccelerator.util.S3URI; @SuppressFBWarnings( @@ -165,7 +166,11 @@ void testHeadObject() { try (S3AsyncClient s3AsyncClient = createMockClient()) { S3SdkObjectClient client = new S3SdkObjectClient(s3AsyncClient); ObjectMetadata metadata = - client.headObject(HeadRequest.builder().s3Uri(S3URI.of("bucket", "key")).build()).join(); + client + .headObject( + HeadRequest.builder().s3Uri(S3URI.of("bucket", "key")).build(), + OpenStreamInformation.DEFAULT) + .join(); assertEquals(metadata, ObjectMetadata.builder().contentLength(42).etag(ETAG).build()); } } @@ -182,7 +187,8 @@ void testGetObjectWithDifferentEtagsThrowsError() { .range(new Range(0, 20)) .etag(ETAG) .referrer(new Referrer("bytes=0-20", ReadMode.SYNC)) - .build())); + .build(), + OpenStreamInformation.DEFAULT)); assertThrows( S3Exception.class, () -> @@ -193,7 +199,8 @@ void testGetObjectWithDifferentEtagsThrowsError() { .range(new Range(0, 20)) .etag("ANOTHER ONE") .referrer(new Referrer("bytes=0-20", ReadMode.SYNC)) - .build()) + .build(), + OpenStreamInformation.DEFAULT) .get()); } } @@ -210,7 +217,8 @@ void testGetObjectWithRange() { .range(new Range(0, 20)) .etag(ETAG) .referrer(new Referrer("bytes=0-20", ReadMode.SYNC)) - .build())); + .build(), + OpenStreamInformation.DEFAULT)); } } @@ -220,7 +228,9 @@ void testGetObjectWithAuditHeaders() { S3SdkObjectClient client = new S3SdkObjectClient(mockS3AsyncClient); + OpenStreamInformation openStreamInformation = mock(OpenStreamInformation.class); StreamContext mockStreamContext = mock(StreamContext.class); + when(openStreamInformation.getStreamContext()).thenReturn(mockStreamContext); when(mockStreamContext.modifyAndBuildReferrerHeader(any())).thenReturn("audit-referrer-value"); GetRequest getRequest = @@ -231,7 +241,7 @@ void testGetObjectWithAuditHeaders() { .referrer(new Referrer("bytes=0-20", ReadMode.SYNC)) .build(); - client.getObject(getRequest, mockStreamContext); + client.getObject(getRequest, openStreamInformation); ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(GetObjectRequest.class); @@ -264,7 +274,7 @@ void testGetObjectWithoutAuditHeaders() { .referrer(new Referrer("original-referrer", ReadMode.SYNC)) .build(); - client.getObject(getRequest, null); + client.getObject(getRequest, OpenStreamInformation.DEFAULT); ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(GetObjectRequest.class); @@ -288,7 +298,9 @@ void testGetObjectWithoutAuditHeaders() { void testObjectClientClose() { try (S3AsyncClient s3AsyncClient = createMockClient()) { try (S3SdkObjectClient client = new S3SdkObjectClient(s3AsyncClient)) { - client.headObject(HeadRequest.builder().s3Uri(S3URI.of("bucket", "key")).build()); + client.headObject( + HeadRequest.builder().s3Uri(S3URI.of("bucket", "key")).build(), + OpenStreamInformation.DEFAULT); } verify(s3AsyncClient, times(1)).close(); } @@ -305,7 +317,8 @@ void testHeadObjectExceptions(Exception exception) { S3SdkObjectClient client = new S3SdkObjectClient(mockS3AsyncClient); HeadRequest headRequest = HeadRequest.builder().s3Uri(TEST_URI).build(); - CompletableFuture future = client.headObject(headRequest); + CompletableFuture future = + client.headObject(headRequest, OpenStreamInformation.DEFAULT); assertObjectClientExceptions(exception, future); } @@ -329,7 +342,8 @@ void testGetObjectExceptions(Exception exception) { .range(new Range(0, 20)) .referrer(new Referrer("original-referrer", ReadMode.SYNC)) .build(); - CompletableFuture future = client.getObject(getRequest); + CompletableFuture future = + client.getObject(getRequest, OpenStreamInformation.DEFAULT); assertObjectClientExceptions(exception, future); }