From 95c7954e677ddae4ed0af978ec8d406e315f3751 Mon Sep 17 00:00:00 2001 From: Ahmar Suhail Date: Tue, 7 Oct 2025 15:12:00 +0100 Subject: [PATCH 1/8] adds more stats --- .../request/ReadMode.java | 28 +++++++++++++------ .../util/DefaultRequestCallbackImpl.java | 10 +++++++ .../util/RequestCallback.java | 8 ++++++ .../S3SeekableInputStreamFactory.java | 3 +- .../io/logical/impl/ParquetLogicalIOImpl.java | 14 ++++++---- .../io/logical/impl/ParquetPrefetcher.java | 14 ++++++---- .../parquet/ParquetMetadataParsingTask.java | 11 ++++++-- .../io/physical/data/BlockManager.java | 13 +++++++++ .../prefetcher/SequentialReadProgression.java | 2 +- 9 files changed, 77 insertions(+), 26 deletions(-) diff --git a/common/src/main/java/software/amazon/s3/analyticsaccelerator/request/ReadMode.java b/common/src/main/java/software/amazon/s3/analyticsaccelerator/request/ReadMode.java index 74719893..678ff727 100644 --- a/common/src/main/java/software/amazon/s3/analyticsaccelerator/request/ReadMode.java +++ b/common/src/main/java/software/amazon/s3/analyticsaccelerator/request/ReadMode.java @@ -23,18 +23,19 @@ */ @AllArgsConstructor public enum ReadMode { - SYNC(true, false), - ASYNC(true, false), - SMALL_OBJECT_PREFETCH(true, false), - SEQUENTIAL_FILE_PREFETCH(true, false), - DICTIONARY_PREFETCH(false, false), - COLUMN_PREFETCH(false, true), - REMAINING_COLUMN_PREFETCH(false, false), - PREFETCH_TAIL(false, false), - READ_VECTORED(false, true); + SYNC(true, false, false), + ASYNC(true, false, true), + SMALL_OBJECT_PREFETCH(true, false, true), + SEQUENTIAL_FILE_PREFETCH(true, false, true), + DICTIONARY_PREFETCH(false, false, true), + COLUMN_PREFETCH(false, true, true), + REMAINING_COLUMN_PREFETCH(false, false, true), + PREFETCH_TAIL(false, false, true), + READ_VECTORED(false, true, false); private final boolean allowRequestExtension; private final boolean coalesceRequests; + private final boolean isPrefetch; /** * Should requests be extended for this read mode? @@ -60,4 +61,13 @@ public boolean allowRequestExtension() { public boolean coalesceRequests() { return coalesceRequests; } + + /** + * Is the request a prefetch? + * + * @return boolean + */ + public boolean isPrefetch() { + return isPrefetch; + } } diff --git a/common/src/main/java/software/amazon/s3/analyticsaccelerator/util/DefaultRequestCallbackImpl.java b/common/src/main/java/software/amazon/s3/analyticsaccelerator/util/DefaultRequestCallbackImpl.java index 589aa641..21eab023 100644 --- a/common/src/main/java/software/amazon/s3/analyticsaccelerator/util/DefaultRequestCallbackImpl.java +++ b/common/src/main/java/software/amazon/s3/analyticsaccelerator/util/DefaultRequestCallbackImpl.java @@ -31,4 +31,14 @@ public void onGetRequest() { public void onHeadRequest() { LOG.trace("HEAD request made"); } + + @Override + public void onBlockPrefetch(long start, long end) { + LOG.trace("Block prefetch made"); + } + + @Override + public void footerParsingFailed() { + LOG.trace("Footer parsing failed"); + } } diff --git a/common/src/main/java/software/amazon/s3/analyticsaccelerator/util/RequestCallback.java b/common/src/main/java/software/amazon/s3/analyticsaccelerator/util/RequestCallback.java index c90a0052..757acc18 100644 --- a/common/src/main/java/software/amazon/s3/analyticsaccelerator/util/RequestCallback.java +++ b/common/src/main/java/software/amazon/s3/analyticsaccelerator/util/RequestCallback.java @@ -19,8 +19,16 @@ * GET and HEAD requests. */ public interface RequestCallback { + /** Called when a GET request is made. */ void onGetRequest(); + /** Called when a HEAD request is made. */ void onHeadRequest(); + + /** Called when a block prefetch is made. */ + void onBlockPrefetch(long start, long end); + + + void footerParsingFailed(); } diff --git a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamFactory.java b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamFactory.java index ceb36f34..3ac93a6d 100644 --- a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamFactory.java +++ b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamFactory.java @@ -153,7 +153,8 @@ LogicalIO createLogicalIO(S3URI s3URI, OpenStreamInformation openStreamInformati createPhysicalIO(s3URI, openStreamInformation), telemetry, configuration.getLogicalIOConfiguration(), - parquetColumnPrefetchStore); + parquetColumnPrefetchStore, + openStreamInformation.getRequestCallback()); case SEQUENTIAL: return new SequentialLogicalIOImpl( diff --git a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/logical/impl/ParquetLogicalIOImpl.java b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/logical/impl/ParquetLogicalIOImpl.java index 7dbb263a..ba11c574 100644 --- a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/logical/impl/ParquetLogicalIOImpl.java +++ b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/logical/impl/ParquetLogicalIOImpl.java @@ -20,6 +20,7 @@ import software.amazon.s3.analyticsaccelerator.common.telemetry.Telemetry; import software.amazon.s3.analyticsaccelerator.io.logical.LogicalIOConfiguration; import software.amazon.s3.analyticsaccelerator.io.physical.PhysicalIO; +import software.amazon.s3.analyticsaccelerator.util.RequestCallback; import software.amazon.s3.analyticsaccelerator.util.S3URI; /** @@ -40,17 +41,18 @@ public class ParquetLogicalIOImpl extends DefaultLogicalIOImpl { * @param parquetColumnPrefetchStore object where Parquet usage information is aggregated */ public ParquetLogicalIOImpl( - @NonNull S3URI s3Uri, - @NonNull PhysicalIO physicalIO, - @NonNull Telemetry telemetry, - @NonNull LogicalIOConfiguration logicalIOConfiguration, - @NonNull ParquetColumnPrefetchStore parquetColumnPrefetchStore) { + @NonNull S3URI s3Uri, + @NonNull PhysicalIO physicalIO, + @NonNull Telemetry telemetry, + @NonNull LogicalIOConfiguration logicalIOConfiguration, + @NonNull ParquetColumnPrefetchStore parquetColumnPrefetchStore, + @NonNull RequestCallback requestCallback) { super(s3Uri, physicalIO, telemetry); // Initialise prefetcher and start prefetching this.parquetPrefetcher = new ParquetPrefetcher( - s3Uri, physicalIO, telemetry, logicalIOConfiguration, parquetColumnPrefetchStore); + s3Uri, physicalIO, telemetry, logicalIOConfiguration, parquetColumnPrefetchStore, requestCallback); this.parquetPrefetcher.prefetchFooterAndBuildMetadata(); } diff --git a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/logical/impl/ParquetPrefetcher.java b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/logical/impl/ParquetPrefetcher.java index b7aa5616..ed5db434 100644 --- a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/logical/impl/ParquetPrefetcher.java +++ b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/logical/impl/ParquetPrefetcher.java @@ -30,6 +30,7 @@ import software.amazon.s3.analyticsaccelerator.io.physical.plan.IOPlanExecution; import software.amazon.s3.analyticsaccelerator.io.physical.plan.IOPlanState; import software.amazon.s3.analyticsaccelerator.util.PrefetchMode; +import software.amazon.s3.analyticsaccelerator.util.RequestCallback; import software.amazon.s3.analyticsaccelerator.util.S3URI; import software.amazon.s3.analyticsaccelerator.util.StreamAttributes; @@ -73,17 +74,18 @@ public class ParquetPrefetcher { * @param parquetColumnPrefetchStore a common place for Parquet usage information */ public ParquetPrefetcher( - S3URI s3Uri, - PhysicalIO physicalIO, - Telemetry telemetry, - LogicalIOConfiguration logicalIOConfiguration, - ParquetColumnPrefetchStore parquetColumnPrefetchStore) { + S3URI s3Uri, + PhysicalIO physicalIO, + Telemetry telemetry, + LogicalIOConfiguration logicalIOConfiguration, + ParquetColumnPrefetchStore parquetColumnPrefetchStore, + RequestCallback requestCallback) { this( s3Uri, logicalIOConfiguration, parquetColumnPrefetchStore, telemetry, - new ParquetMetadataParsingTask(s3Uri, parquetColumnPrefetchStore), + new ParquetMetadataParsingTask(s3Uri, parquetColumnPrefetchStore, requestCallback), new ParquetPrefetchTailTask(s3Uri, telemetry, logicalIOConfiguration, physicalIO), new ParquetReadTailTask(s3Uri, telemetry, logicalIOConfiguration, physicalIO), new ParquetPrefetchRemainingColumnTask( diff --git a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/logical/parquet/ParquetMetadataParsingTask.java b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/logical/parquet/ParquetMetadataParsingTask.java index 34464e9f..a7e8f373 100644 --- a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/logical/parquet/ParquetMetadataParsingTask.java +++ b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/logical/parquet/ParquetMetadataParsingTask.java @@ -26,6 +26,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.s3.analyticsaccelerator.io.logical.impl.ParquetColumnPrefetchStore; +import software.amazon.s3.analyticsaccelerator.util.RequestCallback; import software.amazon.s3.analyticsaccelerator.util.S3URI; /** @@ -37,6 +38,7 @@ public class ParquetMetadataParsingTask { private final S3URI s3URI; private final ParquetParser parquetParser; private final ParquetColumnPrefetchStore parquetColumnPrefetchStore; + private final RequestCallback requestCallback; private static final Logger LOG = LoggerFactory.getLogger(ParquetMetadataParsingTask.class); @@ -47,8 +49,8 @@ public class ParquetMetadataParsingTask { * @param parquetColumnPrefetchStore object containing Parquet usage information */ public ParquetMetadataParsingTask( - S3URI s3URI, ParquetColumnPrefetchStore parquetColumnPrefetchStore) { - this(s3URI, parquetColumnPrefetchStore, new ParquetParser()); + S3URI s3URI, ParquetColumnPrefetchStore parquetColumnPrefetchStore, RequestCallback requestCallback) { + this(s3URI, parquetColumnPrefetchStore, new ParquetParser(), requestCallback); } /** @@ -62,10 +64,12 @@ public ParquetMetadataParsingTask( ParquetMetadataParsingTask( @NonNull S3URI s3URI, @NonNull ParquetColumnPrefetchStore parquetColumnPrefetchStore, - @NonNull ParquetParser parquetParser) { + @NonNull ParquetParser parquetParser, + @NonNull RequestCallback requestCallback) { this.s3URI = s3URI; this.parquetParser = parquetParser; this.parquetColumnPrefetchStore = parquetColumnPrefetchStore; + this.requestCallback = requestCallback; } /** @@ -83,6 +87,7 @@ public ColumnMappers storeColumnMappers(FileTail fileTail) { parquetColumnPrefetchStore.putColumnMappers(this.s3URI, columnMappers); return columnMappers; } catch (Exception e) { + requestCallback.footerParsingFailed(); LOG.debug( "Unable to parse parquet footer for {}, parquet prefetch optimisations will be disabled for this key.", this.s3URI.getKey(), diff --git a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlockManager.java b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlockManager.java index 7f8143f3..75c54504 100644 --- a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlockManager.java +++ b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlockManager.java @@ -39,6 +39,7 @@ import software.amazon.s3.analyticsaccelerator.util.BlockKey; import software.amazon.s3.analyticsaccelerator.util.ObjectKey; import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation; +import software.amazon.s3.analyticsaccelerator.util.RequestCallback; import software.amazon.s3.analyticsaccelerator.util.StreamAttributes; /** Implements a Block Manager responsible for planning and scheduling reads on a key. */ @@ -59,6 +60,7 @@ public class BlockManager implements Closeable { private final SequentialReadProgression sequentialReadProgression; private final RangeOptimiser rangeOptimiser; private final OpenStreamInformation openStreamInformation; + private final RequestCallback requestCallback; private final int maxGeneration; private static final String OPERATION_MAKE_RANGE_AVAILABLE = "block.manager.make.range.available"; @@ -95,6 +97,7 @@ public BlockManager( this.indexCache = indexCache; this.blockStore = new BlockStore(indexCache, aggregatingMetrics, configuration); this.openStreamInformation = openStreamInformation; + this.requestCallback = openStreamInformation.getRequestCallback(); this.streamReader = new StreamReader( objectClient, @@ -155,6 +158,12 @@ public synchronized void makeRangeAvailable(long pos, long len, ReadMode readMod // Range is available, return if (isRangeAvailable(pos, endPos)) return; + if (readMode.isPrefetch()) { + System.out.println("READ MODE IS" + readMode.name()); + requestCallback.onBlockPrefetch(pos, endPos); + } + + long generation = getGeneration(pos, readMode); /* @@ -171,6 +180,10 @@ public synchronized void makeRangeAvailable(long pos, long len, ReadMode readMod if (generation > 0) { maxReadLength = Math.max(maxReadLength, sequentialReadProgression.getSizeForGeneration(generation)); + + System.out.println(maxReadLength); + + requestCallback.onBlockPrefetch(endPos, pos + maxReadLength - 1); } // Truncate end position to the object length long effectiveEnd = truncatePos(pos + maxReadLength - 1); diff --git a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/prefetcher/SequentialReadProgression.java b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/prefetcher/SequentialReadProgression.java index 8645d4bd..62a68c0d 100644 --- a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/prefetcher/SequentialReadProgression.java +++ b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/prefetcher/SequentialReadProgression.java @@ -40,7 +40,7 @@ public class SequentialReadProgression { public long getSizeForGeneration(long generation) { Preconditions.checkArgument(0 <= generation, "`generation` must be non-negative"); - // 2, 8, 32, 64 + // 4, 8, 16, 32 return Math.min( 2 * ONE_MB From 65d85d75ef29553b991e3253c3ce5e5737bf77fa Mon Sep 17 00:00:00 2001 From: Ahmar Suhail Date: Wed, 8 Oct 2025 14:45:09 +0100 Subject: [PATCH 2/8] cleanup --- .../request/ReadMode.java | 6 ++--- .../util/RequestCallback.java | 9 +++++-- .../io/logical/impl/ParquetLogicalIOImpl.java | 20 +++++++++----- .../io/logical/impl/ParquetPrefetcher.java | 13 +++++----- .../parquet/ParquetMetadataParsingTask.java | 6 ++++- .../io/physical/data/BlockManager.java | 5 +--- .../impl/ParquetLogicalIOImplTest.java | 25 ++++++++++++------ .../logical/impl/ParquetPrefetcherTest.java | 19 +++++++++----- .../ParquetMetadataParsingTaskTest.java | 26 ++++++++++++++----- 9 files changed, 85 insertions(+), 44 deletions(-) diff --git a/common/src/main/java/software/amazon/s3/analyticsaccelerator/request/ReadMode.java b/common/src/main/java/software/amazon/s3/analyticsaccelerator/request/ReadMode.java index 678ff727..6f49baf7 100644 --- a/common/src/main/java/software/amazon/s3/analyticsaccelerator/request/ReadMode.java +++ b/common/src/main/java/software/amazon/s3/analyticsaccelerator/request/ReadMode.java @@ -62,10 +62,10 @@ public boolean coalesceRequests() { return coalesceRequests; } - /** - * Is the request a prefetch? + /** + * Is the read mode a prefetch? * - * @return boolean + * @return true if read originates from a prefetch operation. */ public boolean isPrefetch() { return isPrefetch; diff --git a/common/src/main/java/software/amazon/s3/analyticsaccelerator/util/RequestCallback.java b/common/src/main/java/software/amazon/s3/analyticsaccelerator/util/RequestCallback.java index 757acc18..fe441b7c 100644 --- a/common/src/main/java/software/amazon/s3/analyticsaccelerator/util/RequestCallback.java +++ b/common/src/main/java/software/amazon/s3/analyticsaccelerator/util/RequestCallback.java @@ -26,9 +26,14 @@ public interface RequestCallback { /** Called when a HEAD request is made. */ void onHeadRequest(); - /** Called when a block prefetch is made. */ + /** + * Called when a block prefetch is made. + * + * @param start start of prefetch block + * @param end end of prefetch block + */ void onBlockPrefetch(long start, long end); - + /** Called when footer parsing fails. */ void footerParsingFailed(); } diff --git a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/logical/impl/ParquetLogicalIOImpl.java b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/logical/impl/ParquetLogicalIOImpl.java index ba11c574..a2b5a79f 100644 --- a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/logical/impl/ParquetLogicalIOImpl.java +++ b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/logical/impl/ParquetLogicalIOImpl.java @@ -39,20 +39,26 @@ public class ParquetLogicalIOImpl extends DefaultLogicalIOImpl { * @param telemetry an instance of {@link Telemetry} to use * @param logicalIOConfiguration configuration for this logical IO implementation * @param parquetColumnPrefetchStore object where Parquet usage information is aggregated + * @param requestCallback callback for tracking IoStats to upstream integrations such as S3A */ public ParquetLogicalIOImpl( - @NonNull S3URI s3Uri, - @NonNull PhysicalIO physicalIO, - @NonNull Telemetry telemetry, - @NonNull LogicalIOConfiguration logicalIOConfiguration, - @NonNull ParquetColumnPrefetchStore parquetColumnPrefetchStore, - @NonNull RequestCallback requestCallback) { + @NonNull S3URI s3Uri, + @NonNull PhysicalIO physicalIO, + @NonNull Telemetry telemetry, + @NonNull LogicalIOConfiguration logicalIOConfiguration, + @NonNull ParquetColumnPrefetchStore parquetColumnPrefetchStore, + @NonNull RequestCallback requestCallback) { super(s3Uri, physicalIO, telemetry); // Initialise prefetcher and start prefetching this.parquetPrefetcher = new ParquetPrefetcher( - s3Uri, physicalIO, telemetry, logicalIOConfiguration, parquetColumnPrefetchStore, requestCallback); + s3Uri, + physicalIO, + telemetry, + logicalIOConfiguration, + parquetColumnPrefetchStore, + requestCallback); this.parquetPrefetcher.prefetchFooterAndBuildMetadata(); } diff --git a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/logical/impl/ParquetPrefetcher.java b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/logical/impl/ParquetPrefetcher.java index ed5db434..a2e35fb7 100644 --- a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/logical/impl/ParquetPrefetcher.java +++ b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/logical/impl/ParquetPrefetcher.java @@ -72,14 +72,15 @@ public class ParquetPrefetcher { * @param telemetry an instance of {@link Telemetry} to use * @param logicalIOConfiguration the LogicalIO's configuration * @param parquetColumnPrefetchStore a common place for Parquet usage information + * @param requestCallback callback for tracking IoStats to upstream integrations such as S3A */ public ParquetPrefetcher( - S3URI s3Uri, - PhysicalIO physicalIO, - Telemetry telemetry, - LogicalIOConfiguration logicalIOConfiguration, - ParquetColumnPrefetchStore parquetColumnPrefetchStore, - RequestCallback requestCallback) { + S3URI s3Uri, + PhysicalIO physicalIO, + Telemetry telemetry, + LogicalIOConfiguration logicalIOConfiguration, + ParquetColumnPrefetchStore parquetColumnPrefetchStore, + RequestCallback requestCallback) { this( s3Uri, logicalIOConfiguration, diff --git a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/logical/parquet/ParquetMetadataParsingTask.java b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/logical/parquet/ParquetMetadataParsingTask.java index a7e8f373..c7da2bb4 100644 --- a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/logical/parquet/ParquetMetadataParsingTask.java +++ b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/logical/parquet/ParquetMetadataParsingTask.java @@ -47,9 +47,12 @@ public class ParquetMetadataParsingTask { * * @param s3URI the S3Uri of the object * @param parquetColumnPrefetchStore object containing Parquet usage information + * @param requestCallback callback for tracking IoStats to upstream integrations such as S3A */ public ParquetMetadataParsingTask( - S3URI s3URI, ParquetColumnPrefetchStore parquetColumnPrefetchStore, RequestCallback requestCallback) { + S3URI s3URI, + ParquetColumnPrefetchStore parquetColumnPrefetchStore, + RequestCallback requestCallback) { this(s3URI, parquetColumnPrefetchStore, new ParquetParser(), requestCallback); } @@ -60,6 +63,7 @@ public ParquetMetadataParsingTask( * @param s3URI the S3Uri of the object * @param parquetColumnPrefetchStore object containing Parquet usage information * @param parquetParser parser for getting the file metadata + * @param requestCallback callback for tracking IoStats to upstream integrations such as S3A */ ParquetMetadataParsingTask( @NonNull S3URI s3URI, diff --git a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlockManager.java b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlockManager.java index 75c54504..ae40bdec 100644 --- a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlockManager.java +++ b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlockManager.java @@ -159,11 +159,9 @@ public synchronized void makeRangeAvailable(long pos, long len, ReadMode readMod if (isRangeAvailable(pos, endPos)) return; if (readMode.isPrefetch()) { - System.out.println("READ MODE IS" + readMode.name()); requestCallback.onBlockPrefetch(pos, endPos); } - long generation = getGeneration(pos, readMode); /* @@ -181,8 +179,7 @@ public synchronized void makeRangeAvailable(long pos, long len, ReadMode readMod maxReadLength = Math.max(maxReadLength, sequentialReadProgression.getSizeForGeneration(generation)); - System.out.println(maxReadLength); - + // Record any range extension due to sequential prefetching requestCallback.onBlockPrefetch(endPos, pos + maxReadLength - 1); } // Truncate end position to the object length diff --git a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/logical/impl/ParquetLogicalIOImplTest.java b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/logical/impl/ParquetLogicalIOImplTest.java index d51450e3..8768c4c2 100644 --- a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/logical/impl/ParquetLogicalIOImplTest.java +++ b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/logical/impl/ParquetLogicalIOImplTest.java @@ -34,6 +34,7 @@ import software.amazon.s3.analyticsaccelerator.request.HeadRequest; import software.amazon.s3.analyticsaccelerator.request.ObjectClient; import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata; +import software.amazon.s3.analyticsaccelerator.util.DefaultRequestCallbackImpl; import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation; import software.amazon.s3.analyticsaccelerator.util.PrefetchMode; import software.amazon.s3.analyticsaccelerator.util.S3URI; @@ -53,7 +54,8 @@ void testConstructor() { mock(PhysicalIO.class), TestTelemetry.DEFAULT, mock(LogicalIOConfiguration.class), - mock(ParquetColumnPrefetchStore.class))); + mock(ParquetColumnPrefetchStore.class), + new DefaultRequestCallbackImpl())); } @Test @@ -66,7 +68,8 @@ void testConstructorThrowsOnNullArgument() { null, TestTelemetry.DEFAULT, mock(LogicalIOConfiguration.class), - mock(ParquetColumnPrefetchStore.class))); + mock(ParquetColumnPrefetchStore.class), + new DefaultRequestCallbackImpl())); assertThrows( NullPointerException.class, () -> @@ -75,7 +78,8 @@ void testConstructorThrowsOnNullArgument() { mock(PhysicalIO.class), TestTelemetry.DEFAULT, null, - mock(ParquetColumnPrefetchStore.class))); + mock(ParquetColumnPrefetchStore.class), + new DefaultRequestCallbackImpl())); assertThrows( NullPointerException.class, () -> @@ -84,7 +88,8 @@ void testConstructorThrowsOnNullArgument() { mock(PhysicalIO.class), null, mock(LogicalIOConfiguration.class), - mock(ParquetColumnPrefetchStore.class))); + mock(ParquetColumnPrefetchStore.class), + new DefaultRequestCallbackImpl())); assertThrows( NullPointerException.class, () -> @@ -93,7 +98,8 @@ void testConstructorThrowsOnNullArgument() { mock(PhysicalIO.class), TestTelemetry.DEFAULT, mock(LogicalIOConfiguration.class), - null)); + null, + new DefaultRequestCallbackImpl())); assertThrows( NullPointerException.class, @@ -103,7 +109,8 @@ void testConstructorThrowsOnNullArgument() { mock(PhysicalIO.class), TestTelemetry.DEFAULT, mock(LogicalIOConfiguration.class), - mock(ParquetColumnPrefetchStore.class))); + mock(ParquetColumnPrefetchStore.class), + new DefaultRequestCallbackImpl())); } @Test @@ -122,7 +129,8 @@ void testCloseDependencies() throws IOException { physicalIO, TestTelemetry.DEFAULT, configuration, - new ParquetColumnPrefetchStore(configuration)); + new ParquetColumnPrefetchStore(configuration), + new DefaultRequestCallbackImpl()); // When: close called logicalIO.close(); @@ -167,6 +175,7 @@ void testMetadaWithZeroContentLength() throws IOException { physicalIO, TestTelemetry.DEFAULT, LogicalIOConfiguration.DEFAULT, - new ParquetColumnPrefetchStore(LogicalIOConfiguration.DEFAULT))); + new ParquetColumnPrefetchStore(LogicalIOConfiguration.DEFAULT), + new DefaultRequestCallbackImpl())); } } diff --git a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/logical/impl/ParquetPrefetcherTest.java b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/logical/impl/ParquetPrefetcherTest.java index 8257261a..a3f571ef 100644 --- a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/logical/impl/ParquetPrefetcherTest.java +++ b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/logical/impl/ParquetPrefetcherTest.java @@ -47,6 +47,7 @@ import software.amazon.s3.analyticsaccelerator.io.physical.plan.IOPlanState; import software.amazon.s3.analyticsaccelerator.request.ReadMode; import software.amazon.s3.analyticsaccelerator.util.PrefetchMode; +import software.amazon.s3.analyticsaccelerator.util.RequestCallback; import software.amazon.s3.analyticsaccelerator.util.S3URI; @SuppressFBWarnings( @@ -63,7 +64,8 @@ public void testConstructor() { mock(PhysicalIO.class), mock(Telemetry.class), mock(LogicalIOConfiguration.class), - mock(ParquetColumnPrefetchStore.class))); + mock(ParquetColumnPrefetchStore.class), + mock(RequestCallback.class))); } @Test @@ -195,7 +197,8 @@ public void testConstructorNulls() { mock(PhysicalIO.class), mock(Telemetry.class), mock(LogicalIOConfiguration.class), - mock(ParquetColumnPrefetchStore.class))); + mock(ParquetColumnPrefetchStore.class), + mock(RequestCallback.class))); assertThrows( NullPointerException.class, () -> @@ -204,7 +207,8 @@ public void testConstructorNulls() { null, mock(Telemetry.class), mock(LogicalIOConfiguration.class), - mock(ParquetColumnPrefetchStore.class))); + mock(ParquetColumnPrefetchStore.class), + mock(RequestCallback.class))); assertThrows( NullPointerException.class, () -> @@ -213,7 +217,8 @@ public void testConstructorNulls() { mock(PhysicalIO.class), null, mock(LogicalIOConfiguration.class), - mock(ParquetColumnPrefetchStore.class))); + mock(ParquetColumnPrefetchStore.class), + mock(RequestCallback.class))); assertThrows( NullPointerException.class, () -> @@ -222,7 +227,8 @@ public void testConstructorNulls() { mock(PhysicalIO.class), mock(Telemetry.class), null, - mock(ParquetColumnPrefetchStore.class))); + mock(ParquetColumnPrefetchStore.class), + mock(RequestCallback.class))); assertThrows( NullPointerException.class, () -> @@ -231,7 +237,8 @@ public void testConstructorNulls() { mock(PhysicalIO.class), mock(Telemetry.class), mock(LogicalIOConfiguration.class), - null)); + null, + mock(RequestCallback.class))); } @Test diff --git a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/logical/parquet/ParquetMetadataParsingTaskTest.java b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/logical/parquet/ParquetMetadataParsingTaskTest.java index b8b2aece..4b4d97ec 100644 --- a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/logical/parquet/ParquetMetadataParsingTaskTest.java +++ b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/logical/parquet/ParquetMetadataParsingTaskTest.java @@ -45,6 +45,7 @@ import software.amazon.awssdk.utils.ImmutableMap; import software.amazon.s3.analyticsaccelerator.io.logical.LogicalIOConfiguration; import software.amazon.s3.analyticsaccelerator.io.logical.impl.ParquetColumnPrefetchStore; +import software.amazon.s3.analyticsaccelerator.util.DefaultRequestCallbackImpl; import software.amazon.s3.analyticsaccelerator.util.S3URI; @SuppressFBWarnings( @@ -57,7 +58,9 @@ public class ParquetMetadataParsingTaskTest { void testConstructor() { assertNotNull( new ParquetMetadataParsingTask( - TEST_URI, new ParquetColumnPrefetchStore(LogicalIOConfiguration.DEFAULT))); + TEST_URI, + new ParquetColumnPrefetchStore(LogicalIOConfiguration.DEFAULT), + new DefaultRequestCallbackImpl())); } @Test @@ -66,19 +69,26 @@ void testConstructorFailsOnNull() { NullPointerException.class, () -> new ParquetMetadataParsingTask( - null, new ParquetColumnPrefetchStore(LogicalIOConfiguration.DEFAULT))); - assertThrows(NullPointerException.class, () -> new ParquetMetadataParsingTask(TEST_URI, null)); + null, + new ParquetColumnPrefetchStore(LogicalIOConfiguration.DEFAULT), + new DefaultRequestCallbackImpl())); + assertThrows( + NullPointerException.class, + () -> new ParquetMetadataParsingTask(TEST_URI, null, new DefaultRequestCallbackImpl())); assertThrows( NullPointerException.class, () -> new ParquetMetadataParsingTask( null, new ParquetColumnPrefetchStore(LogicalIOConfiguration.DEFAULT), - mock(ParquetParser.class))); + mock(ParquetParser.class), + new DefaultRequestCallbackImpl())); assertThrows( NullPointerException.class, - () -> new ParquetMetadataParsingTask(TEST_URI, null, mock(ParquetParser.class))); + () -> + new ParquetMetadataParsingTask( + TEST_URI, null, mock(ParquetParser.class), new DefaultRequestCallbackImpl())); assertThrows( NullPointerException.class, @@ -236,7 +246,8 @@ void testParsingExceptionsRemappedToCompletionException() throws IOException { new ParquetMetadataParsingTask( TEST_URI, new ParquetColumnPrefetchStore(LogicalIOConfiguration.DEFAULT), - mockedParquetParser); + mockedParquetParser, + new DefaultRequestCallbackImpl()); CompletableFuture parquetMetadataTaskFuture = CompletableFuture.supplyAsync( () -> @@ -264,7 +275,8 @@ private ColumnMappers getColumnMappers(FileMetaData fileMetaData) throws IOExcep new ParquetMetadataParsingTask( TEST_URI, new ParquetColumnPrefetchStore(LogicalIOConfiguration.DEFAULT), - mockedParquetParser); + mockedParquetParser, + new DefaultRequestCallbackImpl()); return parquetMetadataParsingTask.storeColumnMappers(new FileTail(ByteBuffer.allocate(0), 0)); } From 8dc0ad26b55515ef226354fa339832bbdbd96f98 Mon Sep 17 00:00:00 2001 From: Ahmar Suhail Date: Wed, 8 Oct 2025 14:54:22 +0100 Subject: [PATCH 3/8] test fix --- .../analyticsaccelerator/S3SeekableInputStreamTestBase.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamTestBase.java b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamTestBase.java index e17d20f9..184cfc8b 100644 --- a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamTestBase.java +++ b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamTestBase.java @@ -29,6 +29,7 @@ import software.amazon.s3.analyticsaccelerator.io.physical.data.BlobStore; import software.amazon.s3.analyticsaccelerator.io.physical.data.MetadataStore; import software.amazon.s3.analyticsaccelerator.io.physical.impl.PhysicalIOImpl; +import software.amazon.s3.analyticsaccelerator.util.DefaultRequestCallbackImpl; import software.amazon.s3.analyticsaccelerator.util.FakeObjectClient; import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation; import software.amazon.s3.analyticsaccelerator.util.S3URI; @@ -76,7 +77,8 @@ public class S3SeekableInputStreamTestBase { PhysicalIOConfiguration.DEFAULT), TestTelemetry.DEFAULT, logicalIOConfiguration, - new ParquetColumnPrefetchStore(logicalIOConfiguration)); + new ParquetColumnPrefetchStore(logicalIOConfiguration), + new DefaultRequestCallbackImpl()); } catch (IOException e) { throw new RuntimeException(e); } From 9d7b7cf6b1948522e428e8567a71cefe39cc8034 Mon Sep 17 00:00:00 2001 From: Ahmar Suhail Date: Thu, 9 Oct 2025 12:43:04 +0100 Subject: [PATCH 4/8] readVectored() stats --- .../request/ReadMode.java | 2 +- .../util/DefaultRequestCallbackImpl.java | 5 ++++ .../util/RequestCallback.java | 8 +++++ .../io/physical/impl/PhysicalIOImpl.java | 8 +++++ .../util/AnalyticsAcceleratorUtils.java | 30 +++++++++++++++++++ .../S3SeekableInputStreamTest.java | 10 +++++-- 6 files changed, 59 insertions(+), 4 deletions(-) diff --git a/common/src/main/java/software/amazon/s3/analyticsaccelerator/request/ReadMode.java b/common/src/main/java/software/amazon/s3/analyticsaccelerator/request/ReadMode.java index 6f49baf7..a411f735 100644 --- a/common/src/main/java/software/amazon/s3/analyticsaccelerator/request/ReadMode.java +++ b/common/src/main/java/software/amazon/s3/analyticsaccelerator/request/ReadMode.java @@ -62,7 +62,7 @@ public boolean coalesceRequests() { return coalesceRequests; } - /** + /** * Is the read mode a prefetch? * * @return true if read originates from a prefetch operation. diff --git a/common/src/main/java/software/amazon/s3/analyticsaccelerator/util/DefaultRequestCallbackImpl.java b/common/src/main/java/software/amazon/s3/analyticsaccelerator/util/DefaultRequestCallbackImpl.java index 21eab023..1018e80a 100644 --- a/common/src/main/java/software/amazon/s3/analyticsaccelerator/util/DefaultRequestCallbackImpl.java +++ b/common/src/main/java/software/amazon/s3/analyticsaccelerator/util/DefaultRequestCallbackImpl.java @@ -41,4 +41,9 @@ public void onBlockPrefetch(long start, long end) { public void footerParsingFailed() { LOG.trace("Footer parsing failed"); } + + @Override + public void onReadVectored(int numIncomingRanges, int numCombinedRanges) { + LOG.trace("Read vectored made"); + } } diff --git a/common/src/main/java/software/amazon/s3/analyticsaccelerator/util/RequestCallback.java b/common/src/main/java/software/amazon/s3/analyticsaccelerator/util/RequestCallback.java index fe441b7c..6336a967 100644 --- a/common/src/main/java/software/amazon/s3/analyticsaccelerator/util/RequestCallback.java +++ b/common/src/main/java/software/amazon/s3/analyticsaccelerator/util/RequestCallback.java @@ -36,4 +36,12 @@ public interface RequestCallback { /** Called when footer parsing fails. */ void footerParsingFailed(); + + /** + * Called when a read vectored is made. + * + * @param numIncomingRanges num of ranges in the read request + * @param numCombinedRanges num of ranges after range coalescing + */ + void onReadVectored(int numIncomingRanges, int numCombinedRanges); } diff --git a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/impl/PhysicalIOImpl.java b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/impl/PhysicalIOImpl.java index 146c2162..a85a135e 100644 --- a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/impl/PhysicalIOImpl.java +++ b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/impl/PhysicalIOImpl.java @@ -224,7 +224,15 @@ public int readTail(byte[] buf, int off, int len) throws IOException { @Override public IOPlanExecution execute(IOPlan ioPlan, ReadMode readMode) { if (readMode.coalesceRequests() && configuration.isRequestCoalesce()) { + int rangeSizeBeforeCoalescing = ioPlan.getPrefetchRanges().size(); ioPlan.coalesce(configuration.getRequestCoalesceTolerance()); + int coalescedRangeSize = ioPlan.getPrefetchRanges().size(); + + if (readMode == ReadMode.READ_VECTORED) { + openStreamInformation + .getRequestCallback() + .onReadVectored(rangeSizeBeforeCoalescing, coalescedRangeSize); + } } return telemetry.measureVerbose( () -> diff --git a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/util/AnalyticsAcceleratorUtils.java b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/util/AnalyticsAcceleratorUtils.java index 23601bbc..27bb0dda 100644 --- a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/util/AnalyticsAcceleratorUtils.java +++ b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/util/AnalyticsAcceleratorUtils.java @@ -15,7 +15,11 @@ */ package software.amazon.s3.analyticsaccelerator.util; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import software.amazon.s3.analyticsaccelerator.io.physical.PhysicalIOConfiguration; +import software.amazon.s3.analyticsaccelerator.request.Range; /** * Utility class for object size-related operations and determinations. Provides methods to classify @@ -33,4 +37,30 @@ public static boolean isSmallObject(PhysicalIOConfiguration configuration, long return configuration.isSmallObjectsPrefetchingEnabled() && contentLength <= configuration.getSmallObjectSizeThreshold(); } + + public static List coalesceRanges(List currentRanges, long tolerance) { + + List coalescedRages = new ArrayList<>(); + + if (currentRanges.size() < 2) { + return currentRanges; + } + + // Ensure ranges are ordered by their start position. + Collections.sort(currentRanges); + Range currentRange = currentRanges.get(0); + for (int i = 1; i < currentRanges.size(); i++) { + Range nextRange = currentRanges.get(i); + + if (currentRange.getEnd() + tolerance >= nextRange.getStart()) { + currentRange = + new Range(currentRange.getStart(), Math.max(currentRange.getEnd(), nextRange.getEnd())); + } else { + coalescedRages.add(currentRange); + currentRange = nextRange; + } + } + + return coalescedRages; + } } diff --git a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamTest.java b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamTest.java index 23a296e4..c64c3a37 100644 --- a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamTest.java +++ b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamTest.java @@ -40,6 +40,7 @@ import software.amazon.s3.analyticsaccelerator.io.physical.data.MetadataStore; import software.amazon.s3.analyticsaccelerator.io.physical.impl.PhysicalIOImpl; import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata; +import software.amazon.s3.analyticsaccelerator.util.DefaultRequestCallbackImpl; import software.amazon.s3.analyticsaccelerator.util.FakeObjectClient; import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation; import software.amazon.s3.analyticsaccelerator.util.S3URI; @@ -401,7 +402,8 @@ void testMultiThreadUsage() throws IOException, InterruptedException { physicalIO, TestTelemetry.DEFAULT, LogicalIOConfiguration.DEFAULT, - new ParquetColumnPrefetchStore(LogicalIOConfiguration.DEFAULT)); + new ParquetColumnPrefetchStore(LogicalIOConfiguration.DEFAULT), + new DefaultRequestCallbackImpl()); try (SeekableInputStream stream = new S3SeekableInputStream(TEST_URI, logicalIO, TestTelemetry.DEFAULT)) { byte[] buffer = new byte[4]; @@ -597,7 +599,8 @@ private S3SeekableInputStream getTestStreamWithContent(String content, S3URI s3U PhysicalIOConfiguration.DEFAULT), TestTelemetry.DEFAULT, LogicalIOConfiguration.DEFAULT, - new ParquetColumnPrefetchStore(LogicalIOConfiguration.DEFAULT)), + new ParquetColumnPrefetchStore(LogicalIOConfiguration.DEFAULT), + new DefaultRequestCallbackImpl()), TestTelemetry.DEFAULT); } @@ -628,7 +631,8 @@ public void testStreamMetadataConsistencyAfterTtlExpiry() PhysicalIOConfiguration.DEFAULT), TestTelemetry.DEFAULT, LogicalIOConfiguration.DEFAULT, - new ParquetColumnPrefetchStore(LogicalIOConfiguration.DEFAULT)); + new ParquetColumnPrefetchStore(LogicalIOConfiguration.DEFAULT), + new DefaultRequestCallbackImpl()); try (S3SeekableInputStream stream = new S3SeekableInputStream(TEST_URI, logicalIO, TestTelemetry.DEFAULT)) { From cefe410532db9115623c4d4a45136be6c0e0fb72 Mon Sep 17 00:00:00 2001 From: Ahmar Suhail Date: Thu, 9 Oct 2025 15:11:47 +0100 Subject: [PATCH 5/8] test fixes --- .../request/RangeTest.java | 20 +++++++++++-- .../util/AnalyticsAcceleratorUtils.java | 30 ------------------- 2 files changed, 18 insertions(+), 32 deletions(-) diff --git a/common/src/test/java/software/amazon/s3/analyticsaccelerator/request/RangeTest.java b/common/src/test/java/software/amazon/s3/analyticsaccelerator/request/RangeTest.java index b9128e7f..e6571e2f 100644 --- a/common/src/test/java/software/amazon/s3/analyticsaccelerator/request/RangeTest.java +++ b/common/src/test/java/software/amazon/s3/analyticsaccelerator/request/RangeTest.java @@ -15,8 +15,7 @@ */ package software.amazon.s3.analyticsaccelerator.request; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.*; import java.util.stream.Stream; import org.junit.jupiter.api.Test; @@ -50,6 +49,23 @@ void testSize() { assertEquals(100, new Range(0, 99).getLength()); } + @Test + void testContains() { + Range range = new Range(0, 100); + assertTrue(range.contains(50)); + assertFalse(range.contains(120)); + } + + @Test + void testCompareTo() { + Range range1 = new Range(0, 100); + Range range2 = new Range(100, 200); + Range range3 = new Range(200, 300); + assertTrue(range1.compareTo(range2) < 0); + assertTrue(range2.compareTo(range3) < 0); + assertTrue(range3.compareTo(range1) > 0); + } + static Stream validStringRanges() { return Stream.of( Arguments.of(1, 5, "1-5"), diff --git a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/util/AnalyticsAcceleratorUtils.java b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/util/AnalyticsAcceleratorUtils.java index 27bb0dda..23601bbc 100644 --- a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/util/AnalyticsAcceleratorUtils.java +++ b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/util/AnalyticsAcceleratorUtils.java @@ -15,11 +15,7 @@ */ package software.amazon.s3.analyticsaccelerator.util; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; import software.amazon.s3.analyticsaccelerator.io.physical.PhysicalIOConfiguration; -import software.amazon.s3.analyticsaccelerator.request.Range; /** * Utility class for object size-related operations and determinations. Provides methods to classify @@ -37,30 +33,4 @@ public static boolean isSmallObject(PhysicalIOConfiguration configuration, long return configuration.isSmallObjectsPrefetchingEnabled() && contentLength <= configuration.getSmallObjectSizeThreshold(); } - - public static List coalesceRanges(List currentRanges, long tolerance) { - - List coalescedRages = new ArrayList<>(); - - if (currentRanges.size() < 2) { - return currentRanges; - } - - // Ensure ranges are ordered by their start position. - Collections.sort(currentRanges); - Range currentRange = currentRanges.get(0); - for (int i = 1; i < currentRanges.size(); i++) { - Range nextRange = currentRanges.get(i); - - if (currentRange.getEnd() + tolerance >= nextRange.getStart()) { - currentRange = - new Range(currentRange.getStart(), Math.max(currentRange.getEnd(), nextRange.getEnd())); - } else { - coalescedRages.add(currentRange); - currentRange = nextRange; - } - } - - return coalescedRages; - } } From 8609bfea5d71fd73f7728fa58c82a4967900d08d Mon Sep 17 00:00:00 2001 From: Ahmar Suhail Date: Fri, 10 Oct 2025 14:18:01 +0100 Subject: [PATCH 6/8] cache hit metrics --- .../util/DefaultRequestCallbackImpl.java | 5 +++++ .../util/RequestCallback.java | 2 ++ .../io/physical/data/Blob.java | 19 +++++++++++++++++-- .../io/physical/data/BlockManager.java | 7 ++++++- .../io/physical/impl/PhysicalIOImpl.java | 6 +++--- 5 files changed, 33 insertions(+), 6 deletions(-) diff --git a/common/src/main/java/software/amazon/s3/analyticsaccelerator/util/DefaultRequestCallbackImpl.java b/common/src/main/java/software/amazon/s3/analyticsaccelerator/util/DefaultRequestCallbackImpl.java index 1018e80a..88a545db 100644 --- a/common/src/main/java/software/amazon/s3/analyticsaccelerator/util/DefaultRequestCallbackImpl.java +++ b/common/src/main/java/software/amazon/s3/analyticsaccelerator/util/DefaultRequestCallbackImpl.java @@ -46,4 +46,9 @@ public void footerParsingFailed() { public void onReadVectored(int numIncomingRanges, int numCombinedRanges) { LOG.trace("Read vectored made"); } + + @Override + public void onCacheHit() { + LOG.trace("Data was present in cache"); + } } diff --git a/common/src/main/java/software/amazon/s3/analyticsaccelerator/util/RequestCallback.java b/common/src/main/java/software/amazon/s3/analyticsaccelerator/util/RequestCallback.java index 6336a967..f28cf07b 100644 --- a/common/src/main/java/software/amazon/s3/analyticsaccelerator/util/RequestCallback.java +++ b/common/src/main/java/software/amazon/s3/analyticsaccelerator/util/RequestCallback.java @@ -44,4 +44,6 @@ public interface RequestCallback { * @param numCombinedRanges num of ranges after range coalescing */ void onReadVectored(int numIncomingRanges, int numCombinedRanges); + + void onCacheHit(); } diff --git a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/Blob.java b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/Blob.java index cf9c0480..a1a69dc2 100644 --- a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/Blob.java +++ b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/Blob.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.util.Optional; import java.util.concurrent.locks.ReentrantReadWriteLock; + import lombok.NonNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -110,7 +111,7 @@ public int read(long pos) throws IOException { } /** - * Reads data into the provided buffer + * Reads data into the provided buffer. * * @param buf buffer to read data into * @param off start position in buffer at which data is written @@ -120,6 +121,20 @@ public int read(long pos) throws IOException { * @throws IOException if an I/O error occurs */ public int read(byte[] buf, int off, int len, long pos) throws IOException { + return read(buf, off, len, pos, ReadMode.SYNC); + } + + /** + * Reads data into the provided buffer and accepts a readMode. + * + * @param buf buffer to read data into + * @param off start position in buffer at which data is written + * @param len length of data to be read + * @param pos the position to begin reading from + * @return the total number of bytes read into the buffer + * @throws IOException if an I/O error occurs + */ + public int read(byte[] buf, int off, int len, long pos, ReadMode readMode) throws IOException { Preconditions.checkArgument(0 <= pos, "`pos` must not be negative"); Preconditions.checkArgument(pos < contentLength(), "`pos` must be less than content length"); Preconditions.checkArgument(0 <= off, "`off` must not be negative"); @@ -128,7 +143,7 @@ public int read(byte[] buf, int off, int len, long pos) throws IOException { try { lock.readLock().lock(); - blockManager.makeRangeAvailable(pos, len, ReadMode.SYNC); + blockManager.makeRangeAvailable(pos, len, readMode); long nextPosition = pos; int numBytesRead = 0; diff --git a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlockManager.java b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlockManager.java index ae40bdec..7f6f075c 100644 --- a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlockManager.java +++ b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlockManager.java @@ -156,7 +156,12 @@ public synchronized void makeRangeAvailable(long pos, long len, ReadMode readMod long endPos = pos + len - 1; // Range is available, return - if (isRangeAvailable(pos, endPos)) return; + if (isRangeAvailable(pos, endPos)) { + if (readMode == ReadMode.SYNC) { + openStreamInformation.getRequestCallback().onCacheHit(); + } + return; + } if (readMode.isPrefetch()) { requestCallback.onBlockPrefetch(pos, endPos); diff --git a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/impl/PhysicalIOImpl.java b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/impl/PhysicalIOImpl.java index a85a135e..7e1be046 100644 --- a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/impl/PhysicalIOImpl.java +++ b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/impl/PhysicalIOImpl.java @@ -264,7 +264,7 @@ public void readVectored( Blob blob = blobStore.get(objectKey, this.metadata, openStreamInformation); makeReadVectoredRangesAvailable(objectRanges); - + for (ObjectRange objectRange : objectRanges) { ByteBuffer buffer = allocate.apply(objectRange.getLength()); threadPool.submit( @@ -284,7 +284,7 @@ public void readVectored( } else { // there is no use of a temp byte buffer, or buffer.put() calls, // so flip() is not needed. - blob.read(buffer.array(), 0, objectRange.getLength(), objectRange.getOffset()); + blob.read(buffer.array(), 0, objectRange.getLength(), objectRange.getOffset(), ReadMode.READ_VECTORED); } objectRange.getByteBuffer().complete(buffer); } catch (Exception e) { @@ -312,7 +312,7 @@ private void readIntoDirectBuffer(ByteBuffer buffer, Blob blob, ObjectRange rang (readBytes + tmpBufferMaxSize) < length ? tmpBufferMaxSize : (length - readBytes); LOG.debug( "Reading {} bytes from position {} (bytes read={}", currentLength, position, readBytes); - blob.read(tmp, 0, currentLength, position); + blob.read(tmp, 0, currentLength, position, ReadMode.READ_VECTORED); buffer.put(tmp, 0, currentLength); position = position + currentLength; readBytes = readBytes + currentLength; From 54b2f78edb74c83891b5a670e2adb80db4a848e9 Mon Sep 17 00:00:00 2001 From: Ahmar Suhail Date: Fri, 10 Oct 2025 14:22:33 +0100 Subject: [PATCH 7/8] spotlessApply --- .../s3/analyticsaccelerator/util/RequestCallback.java | 1 + .../s3/analyticsaccelerator/io/physical/data/Blob.java | 2 +- .../io/physical/impl/PhysicalIOImpl.java | 9 +++++++-- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/common/src/main/java/software/amazon/s3/analyticsaccelerator/util/RequestCallback.java b/common/src/main/java/software/amazon/s3/analyticsaccelerator/util/RequestCallback.java index f28cf07b..2c38e212 100644 --- a/common/src/main/java/software/amazon/s3/analyticsaccelerator/util/RequestCallback.java +++ b/common/src/main/java/software/amazon/s3/analyticsaccelerator/util/RequestCallback.java @@ -45,5 +45,6 @@ public interface RequestCallback { */ void onReadVectored(int numIncomingRanges, int numCombinedRanges); + /** Called when the request read range is present in the block store. */ void onCacheHit(); } diff --git a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/Blob.java b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/Blob.java index a1a69dc2..5b66ea9f 100644 --- a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/Blob.java +++ b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/Blob.java @@ -19,7 +19,6 @@ import java.io.IOException; import java.util.Optional; import java.util.concurrent.locks.ReentrantReadWriteLock; - import lombok.NonNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -131,6 +130,7 @@ public int read(byte[] buf, int off, int len, long pos) throws IOException { * @param off start position in buffer at which data is written * @param len length of data to be read * @param pos the position to begin reading from + * @param readMode mode to define the read type * @return the total number of bytes read into the buffer * @throws IOException if an I/O error occurs */ diff --git a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/impl/PhysicalIOImpl.java b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/impl/PhysicalIOImpl.java index 7e1be046..aca75b47 100644 --- a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/impl/PhysicalIOImpl.java +++ b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/impl/PhysicalIOImpl.java @@ -264,7 +264,7 @@ public void readVectored( Blob blob = blobStore.get(objectKey, this.metadata, openStreamInformation); makeReadVectoredRangesAvailable(objectRanges); - + for (ObjectRange objectRange : objectRanges) { ByteBuffer buffer = allocate.apply(objectRange.getLength()); threadPool.submit( @@ -284,7 +284,12 @@ public void readVectored( } else { // there is no use of a temp byte buffer, or buffer.put() calls, // so flip() is not needed. - blob.read(buffer.array(), 0, objectRange.getLength(), objectRange.getOffset(), ReadMode.READ_VECTORED); + blob.read( + buffer.array(), + 0, + objectRange.getLength(), + objectRange.getOffset(), + ReadMode.READ_VECTORED); } objectRange.getByteBuffer().complete(buffer); } catch (Exception e) { From a04b838ac464c43ed7d62fd02b2b811d2b199170 Mon Sep 17 00:00:00 2001 From: Ahmar Suhail Date: Mon, 13 Oct 2025 14:50:27 +0100 Subject: [PATCH 8/8] bug fix --- .../s3/analyticsaccelerator/io/physical/data/BlockManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlockManager.java b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlockManager.java index 7f6f075c..2c618dd4 100644 --- a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlockManager.java +++ b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlockManager.java @@ -185,7 +185,7 @@ public synchronized void makeRangeAvailable(long pos, long len, ReadMode readMod Math.max(maxReadLength, sequentialReadProgression.getSizeForGeneration(generation)); // Record any range extension due to sequential prefetching - requestCallback.onBlockPrefetch(endPos, pos + maxReadLength - 1); + requestCallback.onBlockPrefetch(endPos + 1, truncatePos(pos + maxReadLength - 1)); } // Truncate end position to the object length long effectiveEnd = truncatePos(pos + maxReadLength - 1);