Conversation
Added iostats get request callback to streamReader (#317) This PR moves IOStat callback method request from Block to StreamReader. Needs to be done as part of code rebase --- By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license and I agree to the terms of the [Developer Certificate of Origin (DCO)](https://developercertificate.org/). Telemetry support for physical IO (#318) This PR adds telemetry measures for StreamReader and BlockManager. Introducing retry policy to new PhysicalIO (#320) This PR adds the capability of retry to the new PhysicalIO #286 #287 #288 #289 #294 #316 existing APIs or behaviors? No No Unit tests --- By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license and I agree to the terms of the [Developer Certificate of Origin (DCO)](https://developercertificate.org/).
## Description of change This PR rebases ant integrates the changes in PR #321 #### Relevant issues #286 #287 #288 #289 #294 #316 #320 #### Does this contribution introduce any breaking changes to the existing APIs or behaviors? No #### Does this contribution introduce any new public APIs or behaviors? No #### How was the contribution tested? Unit tests --- By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license and I agree to the terms of the [Developer Certificate of Origin (DCO)](https://developercertificate.org/).
## Description of change This PR changes default read buffer size to 128KB to have a better performance #### Relevant issues #286 #287 #288 #289 #294 #316 #320 #323 #### Does this contribution introduce any breaking changes to the existing APIs or behaviors? No #### Does this contribution introduce any new public APIs or behaviors? No #### How was the contribution tested? Unit tests #### Does this contribution need a changelog entry? N/A --- By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license and I agree to the terms of the [Developer Certificate of Origin (DCO)](https://developercertificate.org/).
## Description of change This PR supports Block.read method to throw Exception without retry when SDK throws Status Code: 412 exception, when etag of a file changes. #### Relevant issues #325 #### Does this contribution introduce any breaking changes to the existing APIs or behaviors? No #### Does this contribution introduce any new public APIs or behaviors? No #### How was the contribution tested? Unit tests, integration tests #### Does this contribution need a changelog entry? N/A --- By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license and I agree to the terms of the [Developer Certificate of Origin (DCO)](https://developercertificate.org/).
|
I know there are multiple sub pr's but can you update the description for this one with some details on the overall changes and decisions etc. Am struggling to review this and that might help me. |
| String.format( | ||
| "This block object key %s (for position %s) should have been available.", | ||
| objectKey.getS3URI(), pos)); | ||
| } |
There was a problem hiding this comment.
think u can write it like this, what do you think?
return block.orElseThrow(new IllegalStateException(
String.format(
"This block object key %s (for position %s) should have been available.",
objectKey.getS3URI(), pos)))
.read(pos)
| return blockManager.getBlock(pos).get().read(pos); | ||
| Optional<Block> block = blockManager.getBlock(pos); | ||
| if (!block.isPresent()) { | ||
| throw new IllegalStateException( |
There was a problem hiding this comment.
Is IllegalStateException the correct one?
There was a problem hiding this comment.
Normally we shouldn't face this error and I kept it as it is with the existing system
input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/Blob.java
Show resolved
Hide resolved
| indexCache.recordAccess(blockKey); | ||
| return Byte.toUnsignedInt(content[posToOffset(pos)]); | ||
| awaitDataWithRetry(); | ||
| if (error != null) { |
There was a problem hiding this comment.
these should be this.error and this.data right? to show that they from the class.
| byte[] content = this.retryStrategy.get(this::getData); | ||
| indexCache.recordAccess(blockKey); | ||
| awaitDataWithRetry(); | ||
| if (error != null) { |
There was a problem hiding this comment.
if (error != null) {
throw error;
}
if (data == null) {
throw new IOException("Error while reading data. Block data is null after successful await");
}
Is repeated a lot can't you move this to awaitDataWithRetry
| */ | ||
| private void awaitData() throws IOException { | ||
| try { | ||
| if (!dataReadyLatch.await(readTimeout, TimeUnit.MILLISECONDS)) { |
There was a problem hiding this comment.
Doesn't retryStrategy have a timeout already? why do we need more than 1.
There was a problem hiding this comment.
If we don't set the timeout for await, it will wait forever to countdownlatch to be zero. This timeout prevents this happening
There was a problem hiding this comment.
@ozkoca could you please explain how these retries are working? From what I understand, if readTimeout > 0, you add TimeoutException and some other exceptions to the retry policy.
Then if dataReadyLatch.await times out, the retry handler will call awaitData() again?
There was a problem hiding this comment.
There are two retryPolicies overall. 1/ Default retry policy of the application 2/ RetryPolicy created by end-user during stream creation.
Default retry policy is created by using PhysicalIOConfigurations blockreadtimeout and blockreadretrycount. This policy is used by both StreamReader and Block.read methods.
Second retry policy is provided retry policy which is passed to the stream during stream creation. At the time of creating a retry strategy, we merge both policies and create one strategy.
For retry strategy you can get more information from #321 and #307
There was a problem hiding this comment.
I think this is on me and i am having second thoughts here. We have 2 places for customers to pass us information about retries now. 1/With Configuration that has ReadTimeOut and ReadRetryCount. And 2/ with RetryStrategy. So far I was thinking we should merge two but looking more into usage patters maybe 2 should overwrite 1 if provided. WDYT?
input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/Block.java
Show resolved
Hide resolved
| * @return an {@link Optional} containing the {@link Block} if present; otherwise, {@link | ||
| * Optional#empty()} | ||
| */ | ||
| public synchronized Optional<Block> getBlock(long pos) { |
There was a problem hiding this comment.
why does this have to be synchronized.
There was a problem hiding this comment.
if it is not synchronised, BlockManager object may return blocks which are being removed by removeBlocks method
There was a problem hiding this comment.
but doesn't this stop multiple blocks from being read at the same time?
There was a problem hiding this comment.
BlockManager object may return blocks which are being removed by removeBlocks method
even with the synchronisation, isn't this still possible? the synchronisation is on the individual methods, you you can have on thread calling removeBlocks(), and another thread in getBlock(), and getBlock() can return a block that is about to be removed from removeBlocks?
There was a problem hiding this comment.
if multiple methods are synchronised in an object, multiple threads cannot call both methods at the same time. There is an object lock, and if one thread enters add this object lock is taken and another thread cannot enter subtract.
public class MyCounter {
private int count = 0;
public synchronized void add(int value){
this.count += value;
}
public synchronized void subtract(int value){
this.count -= value;
}
}
There was a problem hiding this comment.
today this is sync. already but i believe we can avoid this going forward. I assume this would need additional changes, therefore, shall we try to address this as a follow-up PR
ahmarsuhail
left a comment
There was a problem hiding this comment.
Thanks a lot @ozkoca, really like these changes!
I don't understand the retry and block removal stuff completely. For example, what happens when you're reading blocks [1, 2, 3, 4] from a single GET, and the read fails on block 3. I think you'll remove [3, 4], but what does this mean for the calling application? Will discuss with you.
Also concerned about what these changes mean for sequential prefetching.
I have not reviewed test code yet, just trying understand the actual code for now. Java docs with more examples will be helpful.
| // TODO: This should be fixed with the new PhysicalIO, currently the cache hit metric is | ||
| // inaccurate. | ||
| // assertEquals(expectedGETCount, | ||
| // s3AALClientStreamReader.getS3SeekableInputStreamFactory().getMetrics().get(MetricKey.CACHE_HIT)); |
There was a problem hiding this comment.
why has this been cut?
There was a problem hiding this comment.
due to performance issues we faced we are not publishing CACHE_HIT and CACHE_MISS metrics in new PhysicalIO
| */ | ||
| private RetryStrategy createRetryStrategy() throws IOException { | ||
| @SuppressWarnings("unchecked") | ||
| private RetryStrategy createRetryStrategy() { |
There was a problem hiding this comment.
detailed javadoc on what changes you're making to the retry policy here will be helpful. It is no clear to me how readTimeout > 0 is related to adding new retries from this code
| */ | ||
| public void setData(final byte[] data) { | ||
| this.data = data; | ||
| this.aggregatingMetrics.add(MetricKey.MEMORY_USAGE, data.length); |
There was a problem hiding this comment.
question: is this the right place to incerement this metric? or should this happen at block creation time?
There was a problem hiding this comment.
Both approaches are valid, each with different trade-offs:
Increasing metric during block creation (early reservation):
- Reserves memory capacity upfront before data is actually loaded
- May lead to over-reservation since we count memory usage before it's actually consumed
- Prevents memory limit breaches but could cause unnecessary capacity constraints
Increasing metric in setData (actual usage tracking):
- Accurately reflects real memory consumption only after data is loaded
- May temporarily exceed memory limits during the gap between data loading and metric updates
- Provides more precise memory accounting but risks brief capacity overruns
The current implementation uses the second approach (setData) to ensure metrics reflect actual memory usage rather than projected usage, accepting the small risk of temporary limit exceedance for better accuracy.
We can discuss both approaches or a different one if you think as an alternative
| */ | ||
| private void awaitData() throws IOException { | ||
| try { | ||
| if (!dataReadyLatch.await(readTimeout, TimeUnit.MILLISECONDS)) { |
There was a problem hiding this comment.
@ozkoca could you please explain how these retries are working? From what I understand, if readTimeout > 0, you add TimeoutException and some other exceptions to the retry policy.
Then if dataReadyLatch.await times out, the retry handler will call awaitData() again?
| // object | ||
| if (!readMode.allowRequestExtension() || pos < configuration.getReadBufferSize()) return 0; | ||
|
|
||
| Optional<Block> previousBlock = blockStore.getBlock(pos - 1); |
There was a problem hiding this comment.
with the new logic doesn't this become too aggressive now? previously we had blocks for larger ranges that corresponded to the actual reads, so the chance something becoming a sequential read was much lower. eg: you would have blocks [0-4MB, 8MB - 10MB]
Now you have blocks of [0-128KB, 128KB+1 - 256KB, 256KB+1 - 384KB], so every read will trigger this GP?
There was a problem hiding this comment.
You are right. We may need to tune sequentialprefetch.base or sequentialprefetch.speed configurations or even change the logic in the method according to performance results. Noted down for future tasks
There was a problem hiding this comment.
i am worried this is already having a really bad impact in the current benchmarks, we should validate and address now.
There was a problem hiding this comment.
Actually it is working similar with existing behaviour. We are not increasing the generation for the blocks which are read together. If we are reading multiple blocks at a single request, we set the generation same for them. So, this is not making much difference impact with existing system. Only difference is, due to the fixed block size, we may read more bytes than we need currently. For example, lets say that we are reading byte at position 0 and 128KB. When we first read position 0, due to the read ahead bytes we first read [0-64KB] and create 2 blocks [0-1] and [1, 64KB] with generation 0. Next when we read byte at 128KB, since the 128KB-1 byte not exist, we read [128KB, 192KB] with generation 0.
In new design, when we first read position 0, we create one block [0,128KB] with generation 0. Next when we are reading 128KB, we check if the byte 128KB-1 exist. Since it is exist with the block we created, we create the next blocks like [128KB-256KB] with generation 1.
Since this is an edge case and the behaviour of the new PhysicalIO is serving the blocks as soon as they are ready, this should not impact the overall performance considerably.
| */ | ||
| private List<List<Integer>> splitGroupIntoChunks( | ||
| List<Integer> group, int blocksPerTargetRequest) { | ||
| int maxBlocksBeforeSplit = calculateMaxBlocksBeforeSplit(blocksPerTargetRequest); |
There was a problem hiding this comment.
you already calculate this in optimizeReads(), so can't you just pass that value into here? why do you need to calculate again?
| List<Integer> previousChunk = chunks.get(chunks.size() - 2); | ||
|
|
||
| if (lastChunk.size() < previousChunk.size() | ||
| && (lastChunk.size() + previousChunk.size()) <= maxBlocksBeforeSplit) { |
There was a problem hiding this comment.
say you have a chunk <[7,8,9],[10]> and your maxBlocksBeforeSplit is 3, so we'll make 2 separate GETS, one for [7,8,9] and then one for [10] right?
| .referrer(new Referrer(requestRange.toHttpString(), readMode)) | ||
| .build(); | ||
|
|
||
| openStreamInformation.getRequestCallback().onGetRequest(); |
There was a problem hiding this comment.
this should happen after the this.objectClient.getObject() in fetchObjectContent
There was a problem hiding this comment.
That makes sense. But in the existing code it is called before the request
| int blockSize = block.getLength(); | ||
|
|
||
| // Skip bytes if there's a gap between current position and block start | ||
| if (!skipToBlockStart(inputStream, blockStart, currentPos)) { |
There was a problem hiding this comment.
it is not clear to me why you need to do skipToBlockStart, can you please explain with an example?
There was a problem hiding this comment.
Added an example inside the comment
| * | ||
| * @param inputStream the input stream to skip bytes from | ||
| * @param blockStart the target start position of the block | ||
| * @param currentPos the current position in the stream |
There was a problem hiding this comment.
nit: make this a bit clearer to mention that you mean position in the SDK stream, not the parent seekable stream
| @SuppressWarnings("unchecked") | ||
| @ParameterizedTest | ||
| @MethodSource("exceptions") | ||
| @Disabled |
There was a problem hiding this comment.
Why is this test disabled? If it's due to the new implementation, it should either be fixed or removed entirely
There was a problem hiding this comment.
Thanks for observing it. No idea why it left disabled. I removed the tag
| private static final int DEFAULT_THREAD_POOL_SIZE = 96; | ||
| private static final long DEFAULT_READ_BUFFER_SIZE = 128 * ONE_KB; | ||
| private static final long DEFAULT_TARGET_REQUEST_SIZE = 8 * ONE_MB; | ||
| private static final double DEFAULT_REQUEST_TOLERANCE_RATIO = 1.4; |
There was a problem hiding this comment.
Why are we changing the default read buffer size to 128mb? and why request tolerance to 1.4?
There was a problem hiding this comment.
We are not changing it, we are adding a new configuration and it is the max block size we want to store. With this new design our aim is to make requests as close as to the DEFAULT_TARGET_REQUEST_SIZE. With the tolerance, we have flexibility to exceed the DEFAULT_TARGET_REQUEST_SIZE. For example if there is a 11MB request, we don't want to split it into 8MB + 3MB. We want to have one 11MB request. But if the total request is 12MB, we want to split it into 8MB + 4MB. According to different performance tests, these configurations can be tuned
There was a problem hiding this comment.
Was this tolerance number of 1.4 is based on some POC's?
| List<Integer> lastChunk = chunks.get(chunks.size() - 1); | ||
| List<Integer> previousChunk = chunks.get(chunks.size() - 2); | ||
|
|
||
| if (lastChunk.size() < previousChunk.size() |
There was a problem hiding this comment.
Do we need this condition lastChunk.size() < previousChunk.size()?, it seems we could merge chunks regardless of their relative sizes, as long as the combined size is within maxBlocksBeforeSplit
There was a problem hiding this comment.
You are right, I will remove that check in next commit
| Preconditions.checkArgument(readBufferSize > 0, "`readBufferSize` must be positive"); | ||
| Preconditions.checkArgument(targetRequestSize > 0, "`targetRequestSize` must be positive"); | ||
| Preconditions.checkArgument( | ||
| requestToleranceRatio >= 1, "`requestToleranceRatio` must be greater or equal than 1"); |
There was a problem hiding this comment.
nit: must be greater than or equal to 1
|
|
||
| /** Shuts down the StreamReader object. */ | ||
| @Override | ||
| public void close() {} |
There was a problem hiding this comment.
Do we need this method? If it's for future use, please add comment
|
|
||
| if (getBlock(pos).isPresent()) { | ||
| return OptionalLong.of(pos); | ||
| public void add(Block block) { |
There was a problem hiding this comment.
It might be worth considering what to do if a block with a different range but the same index is added
There was a problem hiding this comment.
This is not possible because same index can always store the same range. Do you have a case for your comment?
| this.objectFormatSelector = new ObjectFormatSelector(configuration.getLogicalIOConfiguration()); | ||
| this.objectBlobStore = | ||
| new BlobStore(objectClient, telemetry, configuration.getPhysicalIOConfiguration(), metrics); | ||
| // TODO: calling applications should be able to pass in a thread pool if they so wish |
There was a problem hiding this comment.
lets create an issue for this todo
| this.objectBlobStore = | ||
| new BlobStore(objectClient, telemetry, configuration.getPhysicalIOConfiguration(), metrics); | ||
| // TODO: calling applications should be able to pass in a thread pool if they so wish | ||
| this.threadPool = |
There was a problem hiding this comment.
should this threadpool be a daemon?
There was a problem hiding this comment.
Will follow-up in a separate PR
| this.objectMetadataStore.close(); | ||
| this.objectBlobStore.close(); | ||
| this.telemetry.close(); | ||
| this.threadPool.shutdown(); |
There was a problem hiding this comment.
what if this close is never called?
| String.format( | ||
| "This block object key %s (for position %s) should have been available.", | ||
| objectKey.getS3URI().toString(), nextPositionFinal))); | ||
| objectKey.getS3URI(), nextPositionFinal))); |
There was a problem hiding this comment.
do we have a toString override on S3URI now?
There was a problem hiding this comment.
Yes, S3URI has already a toString() method.
input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/Block.java
Show resolved
Hide resolved
| ? OptionalLong.of(nextMissingByte) | ||
| : OptionalLong.empty(); | ||
| int blockIndex = getBlockIndex(block); | ||
| if (blocks.remove(blockIndex) != null && block.isDataReady()) { |
There was a problem hiding this comment.
would we leak resources here if Blocks data is not ready yet?
There was a problem hiding this comment.
In StreamReader we are handling that case. If StreamReader cannot fill the blocks, it removes the unfilled Block objects from cache
| * @throws RuntimeException if all retries fails and an error occurs | ||
| */ | ||
| @SuppressWarnings("unchecked") | ||
| private RetryStrategy createRetryStrategy() { |
There was a problem hiding this comment.
we probably need to move this to Utils.
There was a problem hiding this comment.
We are creating retryStartegy in 2 places 1/StreamReader and 2/Block. Their handled exceptions are different and in the future we may add different onRetry for these functions. So, we can keep them separated for now.
...m/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/reader/StreamReader.java
Show resolved
Hide resolved
|
|
||
| openStreamInformation.getRequestCallback().onGetRequest(); | ||
|
|
||
| if (objectContent == null) { |
There was a problem hiding this comment.
in what condition this if will return true? Wouldn't catch clause above handle this?
There was a problem hiding this comment.
There is no real life use-case for this but added as defensive
| // Process the input stream and populate data blocks | ||
| try (InputStream inputStream = objectContent.getStream()) { | ||
| boolean success = | ||
| readBlocksFromStream(inputStream, blocks, requestRange.getStart()); |
There was a problem hiding this comment.
i am a bit confused where the actual reading from network happens. On fetchObjectContent or readBlocksFromStream?
There was a problem hiding this comment.
Whichever is reading bytes from network, should be one we retry
Description of change
This PR implements a new PhysicalIO design with key improvements:
Fixed Block Size: Previously, block sizes varied based on request ranges, requiring entire request ranges to complete before blocks became available. The new design uses fixed-size blocks that become ready as soon as individual blocks are filled, enabling faster data access and better parallelization.
Direct Block Writing: Eliminates an extra memory copy by writing S3 data directly into Block storage instead of copying from intermediate buffers, reducing memory overhead and CPU usage.
Improved Concurrency: Fixed-size blocks allow multiple blocks to be processed independently, improving throughput for concurrent read operations.
Better Memory Management: Predictable block sizes enable more efficient memory allocation and cache management strategies.
Enhanced Read Performance: Blocks become available for reading as soon as they're filled, rather than waiting for entire request ranges to complete, reducing read latency.
Relevant issues
#286
#287
#288
#289
#294
#316
#320
#323
#324
Does this contribution introduce any breaking changes to the existing APIs or behaviors?
No
Does this contribution introduce any new public APIs or behaviors?
No
How was the contribution tested?
Unit tests, microbenchmarks
Does this contribution need a changelog entry?
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license and I agree to the terms of the Developer Certificate of Origin (DCO).