diff --git a/common/src/main/java/software/amazon/s3/analyticsaccelerator/request/ReadMode.java b/common/src/main/java/software/amazon/s3/analyticsaccelerator/request/ReadMode.java index 74719893..a411f735 100644 --- a/common/src/main/java/software/amazon/s3/analyticsaccelerator/request/ReadMode.java +++ b/common/src/main/java/software/amazon/s3/analyticsaccelerator/request/ReadMode.java @@ -23,18 +23,19 @@ */ @AllArgsConstructor public enum ReadMode { - SYNC(true, false), - ASYNC(true, false), - SMALL_OBJECT_PREFETCH(true, false), - SEQUENTIAL_FILE_PREFETCH(true, false), - DICTIONARY_PREFETCH(false, false), - COLUMN_PREFETCH(false, true), - REMAINING_COLUMN_PREFETCH(false, false), - PREFETCH_TAIL(false, false), - READ_VECTORED(false, true); + SYNC(true, false, false), + ASYNC(true, false, true), + SMALL_OBJECT_PREFETCH(true, false, true), + SEQUENTIAL_FILE_PREFETCH(true, false, true), + DICTIONARY_PREFETCH(false, false, true), + COLUMN_PREFETCH(false, true, true), + REMAINING_COLUMN_PREFETCH(false, false, true), + PREFETCH_TAIL(false, false, true), + READ_VECTORED(false, true, false); private final boolean allowRequestExtension; private final boolean coalesceRequests; + private final boolean isPrefetch; /** * Should requests be extended for this read mode? @@ -60,4 +61,13 @@ public boolean allowRequestExtension() { public boolean coalesceRequests() { return coalesceRequests; } + + /** + * Is the read mode a prefetch? + * + * @return true if read originates from a prefetch operation. + */ + public boolean isPrefetch() { + return isPrefetch; + } } diff --git a/common/src/main/java/software/amazon/s3/analyticsaccelerator/util/DefaultRequestCallbackImpl.java b/common/src/main/java/software/amazon/s3/analyticsaccelerator/util/DefaultRequestCallbackImpl.java index 589aa641..88a545db 100644 --- a/common/src/main/java/software/amazon/s3/analyticsaccelerator/util/DefaultRequestCallbackImpl.java +++ b/common/src/main/java/software/amazon/s3/analyticsaccelerator/util/DefaultRequestCallbackImpl.java @@ -31,4 +31,24 @@ public void onGetRequest() { public void onHeadRequest() { LOG.trace("HEAD request made"); } + + @Override + public void onBlockPrefetch(long start, long end) { + LOG.trace("Block prefetch made"); + } + + @Override + public void footerParsingFailed() { + LOG.trace("Footer parsing failed"); + } + + @Override + public void onReadVectored(int numIncomingRanges, int numCombinedRanges) { + LOG.trace("Read vectored made"); + } + + @Override + public void onCacheHit() { + LOG.trace("Data was present in cache"); + } } diff --git a/common/src/main/java/software/amazon/s3/analyticsaccelerator/util/RequestCallback.java b/common/src/main/java/software/amazon/s3/analyticsaccelerator/util/RequestCallback.java index c90a0052..2c38e212 100644 --- a/common/src/main/java/software/amazon/s3/analyticsaccelerator/util/RequestCallback.java +++ b/common/src/main/java/software/amazon/s3/analyticsaccelerator/util/RequestCallback.java @@ -19,8 +19,32 @@ * GET and HEAD requests. */ public interface RequestCallback { + /** Called when a GET request is made. */ void onGetRequest(); + /** Called when a HEAD request is made. */ void onHeadRequest(); + + /** + * Called when a block prefetch is made. + * + * @param start start of prefetch block + * @param end end of prefetch block + */ + void onBlockPrefetch(long start, long end); + + /** Called when footer parsing fails. */ + void footerParsingFailed(); + + /** + * Called when a read vectored is made. + * + * @param numIncomingRanges num of ranges in the read request + * @param numCombinedRanges num of ranges after range coalescing + */ + void onReadVectored(int numIncomingRanges, int numCombinedRanges); + + /** Called when the request read range is present in the block store. */ + void onCacheHit(); } diff --git a/common/src/test/java/software/amazon/s3/analyticsaccelerator/request/RangeTest.java b/common/src/test/java/software/amazon/s3/analyticsaccelerator/request/RangeTest.java index b9128e7f..e6571e2f 100644 --- a/common/src/test/java/software/amazon/s3/analyticsaccelerator/request/RangeTest.java +++ b/common/src/test/java/software/amazon/s3/analyticsaccelerator/request/RangeTest.java @@ -15,8 +15,7 @@ */ package software.amazon.s3.analyticsaccelerator.request; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.*; import java.util.stream.Stream; import org.junit.jupiter.api.Test; @@ -50,6 +49,23 @@ void testSize() { assertEquals(100, new Range(0, 99).getLength()); } + @Test + void testContains() { + Range range = new Range(0, 100); + assertTrue(range.contains(50)); + assertFalse(range.contains(120)); + } + + @Test + void testCompareTo() { + Range range1 = new Range(0, 100); + Range range2 = new Range(100, 200); + Range range3 = new Range(200, 300); + assertTrue(range1.compareTo(range2) < 0); + assertTrue(range2.compareTo(range3) < 0); + assertTrue(range3.compareTo(range1) > 0); + } + static Stream validStringRanges() { return Stream.of( Arguments.of(1, 5, "1-5"), diff --git a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamFactory.java b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamFactory.java index ceb36f34..3ac93a6d 100644 --- a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamFactory.java +++ b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamFactory.java @@ -153,7 +153,8 @@ LogicalIO createLogicalIO(S3URI s3URI, OpenStreamInformation openStreamInformati createPhysicalIO(s3URI, openStreamInformation), telemetry, configuration.getLogicalIOConfiguration(), - parquetColumnPrefetchStore); + parquetColumnPrefetchStore, + openStreamInformation.getRequestCallback()); case SEQUENTIAL: return new SequentialLogicalIOImpl( diff --git a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/logical/impl/ParquetLogicalIOImpl.java b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/logical/impl/ParquetLogicalIOImpl.java index 7dbb263a..a2b5a79f 100644 --- a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/logical/impl/ParquetLogicalIOImpl.java +++ b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/logical/impl/ParquetLogicalIOImpl.java @@ -20,6 +20,7 @@ import software.amazon.s3.analyticsaccelerator.common.telemetry.Telemetry; import software.amazon.s3.analyticsaccelerator.io.logical.LogicalIOConfiguration; import software.amazon.s3.analyticsaccelerator.io.physical.PhysicalIO; +import software.amazon.s3.analyticsaccelerator.util.RequestCallback; import software.amazon.s3.analyticsaccelerator.util.S3URI; /** @@ -38,19 +39,26 @@ public class ParquetLogicalIOImpl extends DefaultLogicalIOImpl { * @param telemetry an instance of {@link Telemetry} to use * @param logicalIOConfiguration configuration for this logical IO implementation * @param parquetColumnPrefetchStore object where Parquet usage information is aggregated + * @param requestCallback callback for tracking IoStats to upstream integrations such as S3A */ public ParquetLogicalIOImpl( @NonNull S3URI s3Uri, @NonNull PhysicalIO physicalIO, @NonNull Telemetry telemetry, @NonNull LogicalIOConfiguration logicalIOConfiguration, - @NonNull ParquetColumnPrefetchStore parquetColumnPrefetchStore) { + @NonNull ParquetColumnPrefetchStore parquetColumnPrefetchStore, + @NonNull RequestCallback requestCallback) { super(s3Uri, physicalIO, telemetry); // Initialise prefetcher and start prefetching this.parquetPrefetcher = new ParquetPrefetcher( - s3Uri, physicalIO, telemetry, logicalIOConfiguration, parquetColumnPrefetchStore); + s3Uri, + physicalIO, + telemetry, + logicalIOConfiguration, + parquetColumnPrefetchStore, + requestCallback); this.parquetPrefetcher.prefetchFooterAndBuildMetadata(); } diff --git a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/logical/impl/ParquetPrefetcher.java b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/logical/impl/ParquetPrefetcher.java index b7aa5616..a2e35fb7 100644 --- a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/logical/impl/ParquetPrefetcher.java +++ b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/logical/impl/ParquetPrefetcher.java @@ -30,6 +30,7 @@ import software.amazon.s3.analyticsaccelerator.io.physical.plan.IOPlanExecution; import software.amazon.s3.analyticsaccelerator.io.physical.plan.IOPlanState; import software.amazon.s3.analyticsaccelerator.util.PrefetchMode; +import software.amazon.s3.analyticsaccelerator.util.RequestCallback; import software.amazon.s3.analyticsaccelerator.util.S3URI; import software.amazon.s3.analyticsaccelerator.util.StreamAttributes; @@ -71,19 +72,21 @@ public class ParquetPrefetcher { * @param telemetry an instance of {@link Telemetry} to use * @param logicalIOConfiguration the LogicalIO's configuration * @param parquetColumnPrefetchStore a common place for Parquet usage information + * @param requestCallback callback for tracking IoStats to upstream integrations such as S3A */ public ParquetPrefetcher( S3URI s3Uri, PhysicalIO physicalIO, Telemetry telemetry, LogicalIOConfiguration logicalIOConfiguration, - ParquetColumnPrefetchStore parquetColumnPrefetchStore) { + ParquetColumnPrefetchStore parquetColumnPrefetchStore, + RequestCallback requestCallback) { this( s3Uri, logicalIOConfiguration, parquetColumnPrefetchStore, telemetry, - new ParquetMetadataParsingTask(s3Uri, parquetColumnPrefetchStore), + new ParquetMetadataParsingTask(s3Uri, parquetColumnPrefetchStore, requestCallback), new ParquetPrefetchTailTask(s3Uri, telemetry, logicalIOConfiguration, physicalIO), new ParquetReadTailTask(s3Uri, telemetry, logicalIOConfiguration, physicalIO), new ParquetPrefetchRemainingColumnTask( diff --git a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/logical/parquet/ParquetMetadataParsingTask.java b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/logical/parquet/ParquetMetadataParsingTask.java index 34464e9f..c7da2bb4 100644 --- a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/logical/parquet/ParquetMetadataParsingTask.java +++ b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/logical/parquet/ParquetMetadataParsingTask.java @@ -26,6 +26,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.s3.analyticsaccelerator.io.logical.impl.ParquetColumnPrefetchStore; +import software.amazon.s3.analyticsaccelerator.util.RequestCallback; import software.amazon.s3.analyticsaccelerator.util.S3URI; /** @@ -37,6 +38,7 @@ public class ParquetMetadataParsingTask { private final S3URI s3URI; private final ParquetParser parquetParser; private final ParquetColumnPrefetchStore parquetColumnPrefetchStore; + private final RequestCallback requestCallback; private static final Logger LOG = LoggerFactory.getLogger(ParquetMetadataParsingTask.class); @@ -45,10 +47,13 @@ public class ParquetMetadataParsingTask { * * @param s3URI the S3Uri of the object * @param parquetColumnPrefetchStore object containing Parquet usage information + * @param requestCallback callback for tracking IoStats to upstream integrations such as S3A */ public ParquetMetadataParsingTask( - S3URI s3URI, ParquetColumnPrefetchStore parquetColumnPrefetchStore) { - this(s3URI, parquetColumnPrefetchStore, new ParquetParser()); + S3URI s3URI, + ParquetColumnPrefetchStore parquetColumnPrefetchStore, + RequestCallback requestCallback) { + this(s3URI, parquetColumnPrefetchStore, new ParquetParser(), requestCallback); } /** @@ -58,14 +63,17 @@ public ParquetMetadataParsingTask( * @param s3URI the S3Uri of the object * @param parquetColumnPrefetchStore object containing Parquet usage information * @param parquetParser parser for getting the file metadata + * @param requestCallback callback for tracking IoStats to upstream integrations such as S3A */ ParquetMetadataParsingTask( @NonNull S3URI s3URI, @NonNull ParquetColumnPrefetchStore parquetColumnPrefetchStore, - @NonNull ParquetParser parquetParser) { + @NonNull ParquetParser parquetParser, + @NonNull RequestCallback requestCallback) { this.s3URI = s3URI; this.parquetParser = parquetParser; this.parquetColumnPrefetchStore = parquetColumnPrefetchStore; + this.requestCallback = requestCallback; } /** @@ -83,6 +91,7 @@ public ColumnMappers storeColumnMappers(FileTail fileTail) { parquetColumnPrefetchStore.putColumnMappers(this.s3URI, columnMappers); return columnMappers; } catch (Exception e) { + requestCallback.footerParsingFailed(); LOG.debug( "Unable to parse parquet footer for {}, parquet prefetch optimisations will be disabled for this key.", this.s3URI.getKey(), diff --git a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/Blob.java b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/Blob.java index cf9c0480..5b66ea9f 100644 --- a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/Blob.java +++ b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/Blob.java @@ -110,7 +110,7 @@ public int read(long pos) throws IOException { } /** - * Reads data into the provided buffer + * Reads data into the provided buffer. * * @param buf buffer to read data into * @param off start position in buffer at which data is written @@ -120,6 +120,21 @@ public int read(long pos) throws IOException { * @throws IOException if an I/O error occurs */ public int read(byte[] buf, int off, int len, long pos) throws IOException { + return read(buf, off, len, pos, ReadMode.SYNC); + } + + /** + * Reads data into the provided buffer and accepts a readMode. + * + * @param buf buffer to read data into + * @param off start position in buffer at which data is written + * @param len length of data to be read + * @param pos the position to begin reading from + * @param readMode mode to define the read type + * @return the total number of bytes read into the buffer + * @throws IOException if an I/O error occurs + */ + public int read(byte[] buf, int off, int len, long pos, ReadMode readMode) throws IOException { Preconditions.checkArgument(0 <= pos, "`pos` must not be negative"); Preconditions.checkArgument(pos < contentLength(), "`pos` must be less than content length"); Preconditions.checkArgument(0 <= off, "`off` must not be negative"); @@ -128,7 +143,7 @@ public int read(byte[] buf, int off, int len, long pos) throws IOException { try { lock.readLock().lock(); - blockManager.makeRangeAvailable(pos, len, ReadMode.SYNC); + blockManager.makeRangeAvailable(pos, len, readMode); long nextPosition = pos; int numBytesRead = 0; diff --git a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlockManager.java b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlockManager.java index 7f8143f3..2c618dd4 100644 --- a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlockManager.java +++ b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlockManager.java @@ -39,6 +39,7 @@ import software.amazon.s3.analyticsaccelerator.util.BlockKey; import software.amazon.s3.analyticsaccelerator.util.ObjectKey; import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation; +import software.amazon.s3.analyticsaccelerator.util.RequestCallback; import software.amazon.s3.analyticsaccelerator.util.StreamAttributes; /** Implements a Block Manager responsible for planning and scheduling reads on a key. */ @@ -59,6 +60,7 @@ public class BlockManager implements Closeable { private final SequentialReadProgression sequentialReadProgression; private final RangeOptimiser rangeOptimiser; private final OpenStreamInformation openStreamInformation; + private final RequestCallback requestCallback; private final int maxGeneration; private static final String OPERATION_MAKE_RANGE_AVAILABLE = "block.manager.make.range.available"; @@ -95,6 +97,7 @@ public BlockManager( this.indexCache = indexCache; this.blockStore = new BlockStore(indexCache, aggregatingMetrics, configuration); this.openStreamInformation = openStreamInformation; + this.requestCallback = openStreamInformation.getRequestCallback(); this.streamReader = new StreamReader( objectClient, @@ -153,7 +156,16 @@ public synchronized void makeRangeAvailable(long pos, long len, ReadMode readMod long endPos = pos + len - 1; // Range is available, return - if (isRangeAvailable(pos, endPos)) return; + if (isRangeAvailable(pos, endPos)) { + if (readMode == ReadMode.SYNC) { + openStreamInformation.getRequestCallback().onCacheHit(); + } + return; + } + + if (readMode.isPrefetch()) { + requestCallback.onBlockPrefetch(pos, endPos); + } long generation = getGeneration(pos, readMode); @@ -171,6 +183,9 @@ public synchronized void makeRangeAvailable(long pos, long len, ReadMode readMod if (generation > 0) { maxReadLength = Math.max(maxReadLength, sequentialReadProgression.getSizeForGeneration(generation)); + + // Record any range extension due to sequential prefetching + requestCallback.onBlockPrefetch(endPos + 1, truncatePos(pos + maxReadLength - 1)); } // Truncate end position to the object length long effectiveEnd = truncatePos(pos + maxReadLength - 1); diff --git a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/impl/PhysicalIOImpl.java b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/impl/PhysicalIOImpl.java index 146c2162..aca75b47 100644 --- a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/impl/PhysicalIOImpl.java +++ b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/impl/PhysicalIOImpl.java @@ -224,7 +224,15 @@ public int readTail(byte[] buf, int off, int len) throws IOException { @Override public IOPlanExecution execute(IOPlan ioPlan, ReadMode readMode) { if (readMode.coalesceRequests() && configuration.isRequestCoalesce()) { + int rangeSizeBeforeCoalescing = ioPlan.getPrefetchRanges().size(); ioPlan.coalesce(configuration.getRequestCoalesceTolerance()); + int coalescedRangeSize = ioPlan.getPrefetchRanges().size(); + + if (readMode == ReadMode.READ_VECTORED) { + openStreamInformation + .getRequestCallback() + .onReadVectored(rangeSizeBeforeCoalescing, coalescedRangeSize); + } } return telemetry.measureVerbose( () -> @@ -276,7 +284,12 @@ public void readVectored( } else { // there is no use of a temp byte buffer, or buffer.put() calls, // so flip() is not needed. - blob.read(buffer.array(), 0, objectRange.getLength(), objectRange.getOffset()); + blob.read( + buffer.array(), + 0, + objectRange.getLength(), + objectRange.getOffset(), + ReadMode.READ_VECTORED); } objectRange.getByteBuffer().complete(buffer); } catch (Exception e) { @@ -304,7 +317,7 @@ private void readIntoDirectBuffer(ByteBuffer buffer, Blob blob, ObjectRange rang (readBytes + tmpBufferMaxSize) < length ? tmpBufferMaxSize : (length - readBytes); LOG.debug( "Reading {} bytes from position {} (bytes read={}", currentLength, position, readBytes); - blob.read(tmp, 0, currentLength, position); + blob.read(tmp, 0, currentLength, position, ReadMode.READ_VECTORED); buffer.put(tmp, 0, currentLength); position = position + currentLength; readBytes = readBytes + currentLength; diff --git a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/prefetcher/SequentialReadProgression.java b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/prefetcher/SequentialReadProgression.java index 8645d4bd..62a68c0d 100644 --- a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/prefetcher/SequentialReadProgression.java +++ b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/prefetcher/SequentialReadProgression.java @@ -40,7 +40,7 @@ public class SequentialReadProgression { public long getSizeForGeneration(long generation) { Preconditions.checkArgument(0 <= generation, "`generation` must be non-negative"); - // 2, 8, 32, 64 + // 4, 8, 16, 32 return Math.min( 2 * ONE_MB diff --git a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamTest.java b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamTest.java index 23a296e4..c64c3a37 100644 --- a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamTest.java +++ b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamTest.java @@ -40,6 +40,7 @@ import software.amazon.s3.analyticsaccelerator.io.physical.data.MetadataStore; import software.amazon.s3.analyticsaccelerator.io.physical.impl.PhysicalIOImpl; import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata; +import software.amazon.s3.analyticsaccelerator.util.DefaultRequestCallbackImpl; import software.amazon.s3.analyticsaccelerator.util.FakeObjectClient; import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation; import software.amazon.s3.analyticsaccelerator.util.S3URI; @@ -401,7 +402,8 @@ void testMultiThreadUsage() throws IOException, InterruptedException { physicalIO, TestTelemetry.DEFAULT, LogicalIOConfiguration.DEFAULT, - new ParquetColumnPrefetchStore(LogicalIOConfiguration.DEFAULT)); + new ParquetColumnPrefetchStore(LogicalIOConfiguration.DEFAULT), + new DefaultRequestCallbackImpl()); try (SeekableInputStream stream = new S3SeekableInputStream(TEST_URI, logicalIO, TestTelemetry.DEFAULT)) { byte[] buffer = new byte[4]; @@ -597,7 +599,8 @@ private S3SeekableInputStream getTestStreamWithContent(String content, S3URI s3U PhysicalIOConfiguration.DEFAULT), TestTelemetry.DEFAULT, LogicalIOConfiguration.DEFAULT, - new ParquetColumnPrefetchStore(LogicalIOConfiguration.DEFAULT)), + new ParquetColumnPrefetchStore(LogicalIOConfiguration.DEFAULT), + new DefaultRequestCallbackImpl()), TestTelemetry.DEFAULT); } @@ -628,7 +631,8 @@ public void testStreamMetadataConsistencyAfterTtlExpiry() PhysicalIOConfiguration.DEFAULT), TestTelemetry.DEFAULT, LogicalIOConfiguration.DEFAULT, - new ParquetColumnPrefetchStore(LogicalIOConfiguration.DEFAULT)); + new ParquetColumnPrefetchStore(LogicalIOConfiguration.DEFAULT), + new DefaultRequestCallbackImpl()); try (S3SeekableInputStream stream = new S3SeekableInputStream(TEST_URI, logicalIO, TestTelemetry.DEFAULT)) { diff --git a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamTestBase.java b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamTestBase.java index e17d20f9..184cfc8b 100644 --- a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamTestBase.java +++ b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamTestBase.java @@ -29,6 +29,7 @@ import software.amazon.s3.analyticsaccelerator.io.physical.data.BlobStore; import software.amazon.s3.analyticsaccelerator.io.physical.data.MetadataStore; import software.amazon.s3.analyticsaccelerator.io.physical.impl.PhysicalIOImpl; +import software.amazon.s3.analyticsaccelerator.util.DefaultRequestCallbackImpl; import software.amazon.s3.analyticsaccelerator.util.FakeObjectClient; import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation; import software.amazon.s3.analyticsaccelerator.util.S3URI; @@ -76,7 +77,8 @@ public class S3SeekableInputStreamTestBase { PhysicalIOConfiguration.DEFAULT), TestTelemetry.DEFAULT, logicalIOConfiguration, - new ParquetColumnPrefetchStore(logicalIOConfiguration)); + new ParquetColumnPrefetchStore(logicalIOConfiguration), + new DefaultRequestCallbackImpl()); } catch (IOException e) { throw new RuntimeException(e); } diff --git a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/logical/impl/ParquetLogicalIOImplTest.java b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/logical/impl/ParquetLogicalIOImplTest.java index d51450e3..8768c4c2 100644 --- a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/logical/impl/ParquetLogicalIOImplTest.java +++ b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/logical/impl/ParquetLogicalIOImplTest.java @@ -34,6 +34,7 @@ import software.amazon.s3.analyticsaccelerator.request.HeadRequest; import software.amazon.s3.analyticsaccelerator.request.ObjectClient; import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata; +import software.amazon.s3.analyticsaccelerator.util.DefaultRequestCallbackImpl; import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation; import software.amazon.s3.analyticsaccelerator.util.PrefetchMode; import software.amazon.s3.analyticsaccelerator.util.S3URI; @@ -53,7 +54,8 @@ void testConstructor() { mock(PhysicalIO.class), TestTelemetry.DEFAULT, mock(LogicalIOConfiguration.class), - mock(ParquetColumnPrefetchStore.class))); + mock(ParquetColumnPrefetchStore.class), + new DefaultRequestCallbackImpl())); } @Test @@ -66,7 +68,8 @@ void testConstructorThrowsOnNullArgument() { null, TestTelemetry.DEFAULT, mock(LogicalIOConfiguration.class), - mock(ParquetColumnPrefetchStore.class))); + mock(ParquetColumnPrefetchStore.class), + new DefaultRequestCallbackImpl())); assertThrows( NullPointerException.class, () -> @@ -75,7 +78,8 @@ void testConstructorThrowsOnNullArgument() { mock(PhysicalIO.class), TestTelemetry.DEFAULT, null, - mock(ParquetColumnPrefetchStore.class))); + mock(ParquetColumnPrefetchStore.class), + new DefaultRequestCallbackImpl())); assertThrows( NullPointerException.class, () -> @@ -84,7 +88,8 @@ void testConstructorThrowsOnNullArgument() { mock(PhysicalIO.class), null, mock(LogicalIOConfiguration.class), - mock(ParquetColumnPrefetchStore.class))); + mock(ParquetColumnPrefetchStore.class), + new DefaultRequestCallbackImpl())); assertThrows( NullPointerException.class, () -> @@ -93,7 +98,8 @@ void testConstructorThrowsOnNullArgument() { mock(PhysicalIO.class), TestTelemetry.DEFAULT, mock(LogicalIOConfiguration.class), - null)); + null, + new DefaultRequestCallbackImpl())); assertThrows( NullPointerException.class, @@ -103,7 +109,8 @@ void testConstructorThrowsOnNullArgument() { mock(PhysicalIO.class), TestTelemetry.DEFAULT, mock(LogicalIOConfiguration.class), - mock(ParquetColumnPrefetchStore.class))); + mock(ParquetColumnPrefetchStore.class), + new DefaultRequestCallbackImpl())); } @Test @@ -122,7 +129,8 @@ void testCloseDependencies() throws IOException { physicalIO, TestTelemetry.DEFAULT, configuration, - new ParquetColumnPrefetchStore(configuration)); + new ParquetColumnPrefetchStore(configuration), + new DefaultRequestCallbackImpl()); // When: close called logicalIO.close(); @@ -167,6 +175,7 @@ void testMetadaWithZeroContentLength() throws IOException { physicalIO, TestTelemetry.DEFAULT, LogicalIOConfiguration.DEFAULT, - new ParquetColumnPrefetchStore(LogicalIOConfiguration.DEFAULT))); + new ParquetColumnPrefetchStore(LogicalIOConfiguration.DEFAULT), + new DefaultRequestCallbackImpl())); } } diff --git a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/logical/impl/ParquetPrefetcherTest.java b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/logical/impl/ParquetPrefetcherTest.java index 8257261a..a3f571ef 100644 --- a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/logical/impl/ParquetPrefetcherTest.java +++ b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/logical/impl/ParquetPrefetcherTest.java @@ -47,6 +47,7 @@ import software.amazon.s3.analyticsaccelerator.io.physical.plan.IOPlanState; import software.amazon.s3.analyticsaccelerator.request.ReadMode; import software.amazon.s3.analyticsaccelerator.util.PrefetchMode; +import software.amazon.s3.analyticsaccelerator.util.RequestCallback; import software.amazon.s3.analyticsaccelerator.util.S3URI; @SuppressFBWarnings( @@ -63,7 +64,8 @@ public void testConstructor() { mock(PhysicalIO.class), mock(Telemetry.class), mock(LogicalIOConfiguration.class), - mock(ParquetColumnPrefetchStore.class))); + mock(ParquetColumnPrefetchStore.class), + mock(RequestCallback.class))); } @Test @@ -195,7 +197,8 @@ public void testConstructorNulls() { mock(PhysicalIO.class), mock(Telemetry.class), mock(LogicalIOConfiguration.class), - mock(ParquetColumnPrefetchStore.class))); + mock(ParquetColumnPrefetchStore.class), + mock(RequestCallback.class))); assertThrows( NullPointerException.class, () -> @@ -204,7 +207,8 @@ public void testConstructorNulls() { null, mock(Telemetry.class), mock(LogicalIOConfiguration.class), - mock(ParquetColumnPrefetchStore.class))); + mock(ParquetColumnPrefetchStore.class), + mock(RequestCallback.class))); assertThrows( NullPointerException.class, () -> @@ -213,7 +217,8 @@ public void testConstructorNulls() { mock(PhysicalIO.class), null, mock(LogicalIOConfiguration.class), - mock(ParquetColumnPrefetchStore.class))); + mock(ParquetColumnPrefetchStore.class), + mock(RequestCallback.class))); assertThrows( NullPointerException.class, () -> @@ -222,7 +227,8 @@ public void testConstructorNulls() { mock(PhysicalIO.class), mock(Telemetry.class), null, - mock(ParquetColumnPrefetchStore.class))); + mock(ParquetColumnPrefetchStore.class), + mock(RequestCallback.class))); assertThrows( NullPointerException.class, () -> @@ -231,7 +237,8 @@ public void testConstructorNulls() { mock(PhysicalIO.class), mock(Telemetry.class), mock(LogicalIOConfiguration.class), - null)); + null, + mock(RequestCallback.class))); } @Test diff --git a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/logical/parquet/ParquetMetadataParsingTaskTest.java b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/logical/parquet/ParquetMetadataParsingTaskTest.java index b8b2aece..4b4d97ec 100644 --- a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/logical/parquet/ParquetMetadataParsingTaskTest.java +++ b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/logical/parquet/ParquetMetadataParsingTaskTest.java @@ -45,6 +45,7 @@ import software.amazon.awssdk.utils.ImmutableMap; import software.amazon.s3.analyticsaccelerator.io.logical.LogicalIOConfiguration; import software.amazon.s3.analyticsaccelerator.io.logical.impl.ParquetColumnPrefetchStore; +import software.amazon.s3.analyticsaccelerator.util.DefaultRequestCallbackImpl; import software.amazon.s3.analyticsaccelerator.util.S3URI; @SuppressFBWarnings( @@ -57,7 +58,9 @@ public class ParquetMetadataParsingTaskTest { void testConstructor() { assertNotNull( new ParquetMetadataParsingTask( - TEST_URI, new ParquetColumnPrefetchStore(LogicalIOConfiguration.DEFAULT))); + TEST_URI, + new ParquetColumnPrefetchStore(LogicalIOConfiguration.DEFAULT), + new DefaultRequestCallbackImpl())); } @Test @@ -66,19 +69,26 @@ void testConstructorFailsOnNull() { NullPointerException.class, () -> new ParquetMetadataParsingTask( - null, new ParquetColumnPrefetchStore(LogicalIOConfiguration.DEFAULT))); - assertThrows(NullPointerException.class, () -> new ParquetMetadataParsingTask(TEST_URI, null)); + null, + new ParquetColumnPrefetchStore(LogicalIOConfiguration.DEFAULT), + new DefaultRequestCallbackImpl())); + assertThrows( + NullPointerException.class, + () -> new ParquetMetadataParsingTask(TEST_URI, null, new DefaultRequestCallbackImpl())); assertThrows( NullPointerException.class, () -> new ParquetMetadataParsingTask( null, new ParquetColumnPrefetchStore(LogicalIOConfiguration.DEFAULT), - mock(ParquetParser.class))); + mock(ParquetParser.class), + new DefaultRequestCallbackImpl())); assertThrows( NullPointerException.class, - () -> new ParquetMetadataParsingTask(TEST_URI, null, mock(ParquetParser.class))); + () -> + new ParquetMetadataParsingTask( + TEST_URI, null, mock(ParquetParser.class), new DefaultRequestCallbackImpl())); assertThrows( NullPointerException.class, @@ -236,7 +246,8 @@ void testParsingExceptionsRemappedToCompletionException() throws IOException { new ParquetMetadataParsingTask( TEST_URI, new ParquetColumnPrefetchStore(LogicalIOConfiguration.DEFAULT), - mockedParquetParser); + mockedParquetParser, + new DefaultRequestCallbackImpl()); CompletableFuture parquetMetadataTaskFuture = CompletableFuture.supplyAsync( () -> @@ -264,7 +275,8 @@ private ColumnMappers getColumnMappers(FileMetaData fileMetaData) throws IOExcep new ParquetMetadataParsingTask( TEST_URI, new ParquetColumnPrefetchStore(LogicalIOConfiguration.DEFAULT), - mockedParquetParser); + mockedParquetParser, + new DefaultRequestCallbackImpl()); return parquetMetadataParsingTask.storeColumnMappers(new FileTail(ByteBuffer.allocate(0), 0)); }