Conversation
e6e665b to
a934e02
Compare
ahmarsuhail
left a comment
There was a problem hiding this comment.
Thanks a lot @ozkoca, this is looking really good.
I have some questions, mainly about the block splitting/grouping logic, and how your implementation deals with errors in the reads. Let's discuss.
Have not reviewed tests yet, still just trying to understand how all of this is working. Think we'll definitely some good documentation around this as it's starting to get complicated!
| lock.readLock().lock(); | ||
| blockManager.makePositionAvailable(pos, ReadMode.SYNC); | ||
| return blockManager.getBlock(pos).get().read(pos); | ||
| return blockManager.getBlock(pos).get().read(pos); // TODO add if block exist check |
There was a problem hiding this comment.
why do you need the check here? I think previously we would throw an exception if we got here and the block did not exist. the .makePositionAvailable should have made the block available right
There was a problem hiding this comment.
In the new design, Block object's data is filled by StreamRead as async. If an exception occurs while reading from stream, StreamReader deletes the corresponding Block object from BlockManager since it couldn't read the data. Previously, these operations were sync and we were able to throw exception from Blob when an exception was thrown while reading the data.
| try { | ||
| makeRangeAvailable(0, metadata.getContentLength(), ReadMode.SMALL_OBJECT_PREFETCH); | ||
| } catch (IOException e) { | ||
| } catch (Exception e) { |
There was a problem hiding this comment.
what exceptions other than IoException are we throwing now?
There was a problem hiding this comment.
There can be exceptions thrown wile submitting task to threadPool. So we need to handle it as Exception.
| if (missingBlockIndexes.isEmpty()) return; | ||
|
|
||
| // Split missing blocks into groups of sequential indexes that respect maximum range size | ||
| List<List<Integer>> groupedReads = splitReads(missingBlockIndexes); |
There was a problem hiding this comment.
I don't understand what's going on inside the splitReads, we should discuss
There was a problem hiding this comment.
I am trying to understand what will happen if there are wholes in these ranges/blocks. Assume we have 3-4-5 in memory and we have a read for blocks 1-2-3-4-5-6-7. What will be the S3 final request(s)?
There was a problem hiding this comment.
We are not reading the requests already in memory. So the missing blocks are 1-2-6-7. With the existing implementation, we are splitting contiguous blocks. So, the final requests will be 1-2 and 6-7. With future implementations, we will be able to merge these requests into one if the distance between ranges are reasonable amount and skip the already read blocks.
| * @param blocks the list of data blocks, must be non-empty and sorted by offset | ||
| * @return the Range covering all blocks from first start to last end | ||
| */ | ||
| private Range computeRange(List<Block> blocks) { |
There was a problem hiding this comment.
is there a guarantee that the ranges are sorted here?
There was a problem hiding this comment.
getMissingBlockIndexesInRange method returns the missing blocks in order. I didn't want to add another check here not to impact the performance
...m/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/reader/StreamReader.java
Show resolved
Hide resolved
...m/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/reader/StreamReader.java
Outdated
Show resolved
Hide resolved
| } | ||
|
|
||
| /** | ||
| * Skips bytes in the input stream to reach the start position of a block. Handles cases where |
There was a problem hiding this comment.
an example in the javadocs would be super helpful to understand what we mean by non contiguous blocks.
eg: when we have two blocks [5,10], [15,25], this will skip to pos 15..
...m/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/reader/StreamReader.java
Outdated
Show resolved
Hide resolved
...m/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/reader/StreamReader.java
Outdated
Show resolved
Hide resolved
input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/Blob.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
@ozkoca Thanks a lot for the PR. I didnt had much time to read all the tests, i will do another pass.
I have few nitpicks. My biggest concern about the in rangeOptimiser class and StreamReader class:
1/ It is really hard to reason about the 2 parameters in rangeOptimiser. I think with a single parameters we can try to make sure each S3 request is around targetRequetSizeMBs. We should not use targetRequetSizeMBs as a splitting logic because it will create small leftover requests (e.g. if targetRequestSizeMB = 8 and a read is 8.1MB we shouldn't do 2 requests 8MB and 0.1MB).
2/It might be good to add correctness checks (Blocks are ordered, no skips etc) to the StreamReader.
As a follow-up to this PR (not now) we should discuss how to tolerate gaps (e.g. you have need blocks 1-2-3-4-5-6 and you already have 4 in memory, do we create 2 s3 requests or 1. We should be able to tolerate some gaps in the read.
...ream/src/main/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamFactory.java
Show resolved
Hide resolved
input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/Blob.java
Show resolved
Hide resolved
...stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlobStore.java
Show resolved
Hide resolved
...stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlobStore.java
Show resolved
Hide resolved
...eam/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlockManager.java
Outdated
Show resolved
Hide resolved
...m/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/reader/StreamReader.java
Outdated
Show resolved
Hide resolved
...t/java/software/amazon/s3/analyticsaccelerator/io/logical/impl/ParquetLogicalIOImplTest.java
Show resolved
Hide resolved
...am/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlobStoreTest.java
Outdated
Show resolved
Hide resolved
...am/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlobStoreTest.java
Outdated
Show resolved
Hide resolved
...src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/data/BlockManagerTest.java
Show resolved
Hide resolved
…alIO (awslabs#287) ## Description of change - This change adopts the changes from [PR](awslabs#283) to the new Physical IO implementation. - Updates comment in DataBlock object #### Relevant issues [OpenStremInformation PR](awslabs#283) [Initial version of Physical IO](awslabs#286) --- By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license and I agree to the terms of the [Developer Certificate of Origin (DCO)](https://developercertificate.org/). --------- Co-authored-by: Erdogan Ozkoca <ozkoca@amazon.com>
…slabs#288) ## Description of change This PR adopts the memory manager changes to new physicalIO/ #### Relevant issues PR History: - awslabs#286 - awslabs#287 #### Does this contribution introduce any breaking changes to the existing APIs or behaviors? No #### Does this contribution introduce any new public APIs or behaviors? No --- By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license and I agree to the terms of the [Developer Certificate of Origin (DCO)](https://developercertificate.org/). --------- Co-authored-by: Erdogan Ozkoca <ozkoca@amazon.com>
## Description of change This PR adds a new method optimizeReads to the RangeOptimiser class to improve read performance by intelligently grouping and splitting block indexes. The implementation reduces the complexity in DataBlockManager and makes the optimization logic more testable. Changes are: - Adds readAheadBytes logic - Adds sequential prefetching logic - Groups sequential block indexes together - Splits large sequential groups into smaller chunks based on configuration parameters - Refactored DataBlockManager to use the new method instead of implementing the logic itself - Added comprehensive unit tests for the new method Out of Scope - Range coalescing will be implemented in a separate PR #### Relevant issues PR History: awslabs#286 awslabs#287 awslabs#288 #### Does this contribution introduce any breaking changes to the existing APIs or behaviors? No #### Does this contribution introduce any new public APIs or behaviors? No --- By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license and I agree to the terms of the [Developer Certificate of Origin (DCO)](https://developercertificate.org/).
This PR merges the new PhysicalIO changes to the Blob object and start to use the new implementation. Next Steps: - Range coalescing implementation - Retry policy implementation PR History: awslabs#286 awslabs#287 awslabs#288 awslabs#289 existing APIs or behaviors? No No Unit test n/A --- By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license and I agree to the terms of the [Developer Certificate of Origin (DCO)](https://developercertificate.org/).
3ac7e0e to
8b0205d
Compare
|
Moved to #316 to merge to feature branch |
Description of change
This PR includes changes regarding to new PhysicalIO design.
Relevant issues
#286
#287
#288
#289
#294
Does this contribution introduce any breaking changes to the existing APIs or behaviors?
No
Does this contribution introduce any new public APIs or behaviors?
No
How was the contribution tested?
Unit tests, microbenchmarks
Does this contribution need a changelog entry?
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license and I agree to the terms of the Developer Certificate of Origin (DCO).