diff --git a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/PhysicalIOConfiguration.java b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/PhysicalIOConfiguration.java index 8987ff69..7a1e71a0 100644 --- a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/PhysicalIOConfiguration.java +++ b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/PhysicalIOConfiguration.java @@ -47,6 +47,7 @@ public class PhysicalIOConfiguration { private static final boolean DEFAULT_SMALL_OBJECTS_PREFETCHING_ENABLED = true; private static final long DEFAULT_SMALL_OBJECT_SIZE_THRESHOLD = 8 * ONE_MB; private static final int DEFAULT_THREAD_POOL_SIZE = 96; + private static final long DEFAULT_READ_BUFFER_SIZE = 8 * ONE_KB; /** * Capacity, in blobs. {@link PhysicalIOConfiguration#DEFAULT_MEMORY_CAPACITY_BYTES} by default. @@ -151,6 +152,9 @@ public class PhysicalIOConfiguration { @Builder.Default private int threadPoolSize = DEFAULT_THREAD_POOL_SIZE; + private static final String READ_BUFFER_SIZE_KEY = "readbuffersize"; + @Builder.Default private long readBufferSize = DEFAULT_READ_BUFFER_SIZE; + /** Default set of settings for {@link PhysicalIO} */ public static final PhysicalIOConfiguration DEFAULT = PhysicalIOConfiguration.builder().build(); @@ -192,6 +196,7 @@ public static PhysicalIOConfiguration fromConfiguration(ConnectorConfiguration c configuration.getLong( SMALL_OBJECT_SIZE_THRESHOLD_KEY, DEFAULT_SMALL_OBJECT_SIZE_THRESHOLD)) .threadPoolSize(configuration.getInt(THREAD_POOL_SIZE_KEY, DEFAULT_THREAD_POOL_SIZE)) + .readBufferSize(configuration.getLong(READ_BUFFER_SIZE_KEY, DEFAULT_READ_BUFFER_SIZE)) .build(); } @@ -215,6 +220,7 @@ public static PhysicalIOConfiguration fromConfiguration(ConnectorConfiguration c * @param smallObjectsPrefetchingEnabled Whether small object prefetching is enabled * @param smallObjectSizeThreshold Maximum size in bytes for an object to be considered small * @param threadPoolSize Size of thread pool to be used for async operations + * @param readBufferSize Size of the maximum buffer for read operations */ @Builder private PhysicalIOConfiguration( @@ -232,7 +238,8 @@ private PhysicalIOConfiguration( int blockReadRetryCount, boolean smallObjectsPrefetchingEnabled, long smallObjectSizeThreshold, - int threadPoolSize) { + int threadPoolSize, + long readBufferSize) { Preconditions.checkArgument(memoryCapacityBytes > 0, "`memoryCapacityBytes` must be positive"); Preconditions.checkArgument( memoryCleanupFrequencyMilliseconds > 0, @@ -254,6 +261,7 @@ private PhysicalIOConfiguration( Preconditions.checkArgument( smallObjectSizeThreshold > 0, "`smallObjectSizeThreshold` must be positive"); Preconditions.checkNotNull(threadPoolSize > 0, "`threadPoolSize` must be positive"); + Preconditions.checkArgument(readBufferSize > 0, "`readBufferSize` must be positive"); this.memoryCapacityBytes = memoryCapacityBytes; this.memoryCleanupFrequencyMilliseconds = memoryCleanupFrequencyMilliseconds; @@ -270,6 +278,7 @@ private PhysicalIOConfiguration( this.smallObjectsPrefetchingEnabled = smallObjectsPrefetchingEnabled; this.smallObjectSizeThreshold = smallObjectSizeThreshold; this.threadPoolSize = threadPoolSize; + this.readBufferSize = readBufferSize; } @Override @@ -293,6 +302,7 @@ public String toString() { builder.append("\tsmallObjectsPrefetchingEnabled: " + smallObjectsPrefetchingEnabled + "\n"); builder.append("\tsmallObjectSizeThreshold: " + smallObjectSizeThreshold + "\n"); builder.append("\tthreadPoolSize: " + threadPoolSize + "\n"); + builder.append("\treadBufferSize: " + readBufferSize + "\n"); return builder.toString(); } diff --git a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/DataBlock.java b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/DataBlock.java new file mode 100644 index 00000000..c9c386d5 --- /dev/null +++ b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/DataBlock.java @@ -0,0 +1,134 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package software.amazon.s3.analyticsaccelerator.io.physical.data; + +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import javax.annotation.Nullable; +import lombok.Getter; +import lombok.NonNull; +import software.amazon.s3.analyticsaccelerator.common.Preconditions; +import software.amazon.s3.analyticsaccelerator.util.BlockKey; + +/** Block object stores the data of a stream */ +public class DataBlock implements Closeable { + /** + * The data of the block, set after construction via {@link #setData(byte[])}. Accessed only after + * ensuring readiness via {@link #awaitData()}. + */ + @Nullable private byte[] data; + + @Getter private final BlockKey blockKey; + @Getter private final long generation; + private final CountDownLatch dataReadyLatch = new CountDownLatch(1); + + /** + * Constructs a DataBlock object + * + * @param blockKey the objectKey and range of the object + * @param generation generation of the block in a sequential read pattern + */ + public DataBlock(@NonNull BlockKey blockKey, long generation) { + long start = blockKey.getRange().getStart(); + long end = blockKey.getRange().getEnd(); + Preconditions.checkArgument( + 0 <= generation, "`generation` must be non-negative; was: %s", generation); + Preconditions.checkArgument(0 <= start, "`start` must be non-negative; was: %s", start); + Preconditions.checkArgument(0 <= end, "`end` must be non-negative; was: %s", end); + + this.blockKey = blockKey; + this.generation = generation; + } + + /** + * Reads a byte from the underlying object + * + * @param pos The position to read + * @return an unsigned int representing the byte that was read + * @throws IOException if an I/O error occurs + */ + public int read(long pos) throws IOException { + Preconditions.checkArgument(0 <= pos, "`pos` must not be negative"); + awaitData(); + int contentOffset = posToOffset(pos); + return Byte.toUnsignedInt(this.data[contentOffset]); + } + + /** + * Reads data into the provided buffer + * + * @param buf buffer to read data into + * @param off start position in buffer at which data is written + * @param len length of data to be read + * @param pos the position to begin reading from + * @return the total number of bytes read into the buffer + * @throws IOException if an I/O error occurs + */ + public int read(byte @NonNull [] buf, int off, int len, long pos) throws IOException { + Preconditions.checkArgument(0 <= pos, "`pos` must not be negative"); + Preconditions.checkArgument(0 <= off, "`off` must not be negative"); + Preconditions.checkArgument(0 <= len, "`len` must not be negative"); + Preconditions.checkArgument(off < buf.length, "`off` must be less than size of buffer"); + + awaitData(); + + int contentOffset = posToOffset(pos); + int available = this.data.length - contentOffset; + int bytesToCopy = Math.min(len, available); + + if (bytesToCopy >= 0) System.arraycopy(this.data, contentOffset, buf, off, bytesToCopy); + + return bytesToCopy; + } + + /** + * Determines the offset in the Block corresponding to a position in an object. + * + * @param pos the position of a byte in the object + * @return the offset in the byte buffer underlying this Block + */ + private int posToOffset(long pos) { + return (int) (pos - this.blockKey.getRange().getStart()); + } + + /** + * Method to set data and reduce the dataReadyLatch to signal that data is ready + * + * @param data data of the block + */ + public void setData(final byte[] data) { + this.data = data; + dataReadyLatch.countDown(); + } + + /** Method to wait until data is fully loaded */ + private void awaitData() throws IOException { + try { + dataReadyLatch.await(); + } catch (InterruptedException e) { + throw new IOException("Failed to read data", e); + } + + if (data == null) throw new IOException("Failed to read data"); + } + + /** Closes the {@link DataBlock} and frees up all resources it holds */ + @Override + public void close() throws IOException { + this.data = null; + } +} diff --git a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/DataBlockManager.java b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/DataBlockManager.java new file mode 100644 index 00000000..d6ded15b --- /dev/null +++ b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/DataBlockManager.java @@ -0,0 +1,230 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package software.amazon.s3.analyticsaccelerator.io.physical.data; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.io.Closeable; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ExecutorService; +import lombok.NonNull; +import software.amazon.s3.analyticsaccelerator.common.Metrics; +import software.amazon.s3.analyticsaccelerator.common.Preconditions; +import software.amazon.s3.analyticsaccelerator.common.telemetry.Telemetry; +import software.amazon.s3.analyticsaccelerator.io.physical.PhysicalIOConfiguration; +import software.amazon.s3.analyticsaccelerator.io.physical.reader.StreamReader; +import software.amazon.s3.analyticsaccelerator.request.*; +import software.amazon.s3.analyticsaccelerator.util.BlockKey; +import software.amazon.s3.analyticsaccelerator.util.ObjectKey; + +/** Implements a Block Manager responsible for planning and scheduling reads on a key. */ +public class DataBlockManager implements Closeable { + private final ObjectKey objectKey; + private final ObjectMetadata metadata; + + @SuppressFBWarnings( + value = "URF_UNREAD_FIELD", + justification = "Field is injected and may be used in the future") + private final Telemetry telemetry; + + private final PhysicalIOConfiguration configuration; + + @SuppressFBWarnings( + value = "URF_UNREAD_FIELD", + justification = "Field is injected and may be used in the future") + private final Metrics aggregatingMetrics; + + @SuppressFBWarnings( + value = "URF_UNREAD_FIELD", + justification = "Field is injected and may be used in the future") + private final BlobStoreIndexCache indexCache; + + private final StreamReader streamReader; + + private final DataBlockStore blockStore; + + /** + * Constructs a new BlockManager. + * + * @param objectKey the key representing the S3 object, including its URI and ETag + * @param objectClient the client used to fetch object content from S3 + * @param metadata metadata associated with the S3 object, including content length + * @param telemetry the telemetry interface used for logging or instrumentation + * @param configuration configuration for physical IO operations (e.g., read buffer size) + * @param aggregatingMetrics the metrics aggregator for performance or usage monitoring + * @param indexCache cache for blob index metadata (if applicable) + * @param threadPool Thread pool + */ + public DataBlockManager( + @NonNull ObjectKey objectKey, + @NonNull ObjectClient objectClient, + @NonNull ObjectMetadata metadata, + @NonNull Telemetry telemetry, + @NonNull PhysicalIOConfiguration configuration, + @NonNull Metrics aggregatingMetrics, + @NonNull BlobStoreIndexCache indexCache, + @NonNull ExecutorService threadPool) { + this( + objectKey, + objectClient, + metadata, + telemetry, + configuration, + aggregatingMetrics, + indexCache, + threadPool, + null); + } + + /** + * Constructs a new BlockManager. + * + * @param objectKey the key representing the S3 object, including its URI and ETag + * @param objectClient the client used to fetch object content from S3 + * @param metadata metadata associated with the S3 object, including content length + * @param telemetry the telemetry interface used for logging or instrumentation + * @param configuration configuration for physical IO operations (e.g., read buffer size) + * @param aggregatingMetrics the metrics aggregator for performance or usage monitoring + * @param indexCache cache for blob index metadata (if applicable) + * @param threadPool Thread pool + * @param streamContext context for stream-based reads, e.g., buffering or retry behavior + */ + public DataBlockManager( + @NonNull ObjectKey objectKey, + @NonNull ObjectClient objectClient, + @NonNull ObjectMetadata metadata, + @NonNull Telemetry telemetry, + @NonNull PhysicalIOConfiguration configuration, + @NonNull Metrics aggregatingMetrics, + @NonNull BlobStoreIndexCache indexCache, + @NonNull ExecutorService threadPool, + StreamContext streamContext) { + this.objectKey = objectKey; + this.metadata = metadata; + this.telemetry = telemetry; + this.configuration = configuration; + this.aggregatingMetrics = aggregatingMetrics; + this.indexCache = indexCache; + this.streamReader = new StreamReader(objectClient, objectKey, threadPool, streamContext); + this.blockStore = new DataBlockStore(configuration); + } + + /** + * Make sure that the byte at a give position is in the BlockStore. + * + * @param pos the position of the byte + * @param readMode whether this ask corresponds to a sync or async read + */ + public synchronized void makePositionAvailable(long pos, ReadMode readMode) { + Preconditions.checkArgument(0 <= pos, "`pos` must not be negative"); + + if (getBlock(pos).isPresent()) return; + + makeRangeAvailable(pos, 1, readMode); + } + + /** + * Method that ensures that a range is fully available in the object store. After calling this + * method the BlockStore should contain all bytes in the range and we should be able to service a + * read through the BlockStore. + * + * @param pos start of a read + * @param len length of the read + * @param readMode whether this ask corresponds to a sync or async read + */ + public synchronized void makeRangeAvailable(long pos, long len, ReadMode readMode) { + Preconditions.checkArgument(0 <= pos, "`pos` must not be negative"); + Preconditions.checkArgument(0 <= len, "`len` must not be negative"); + + long endPos = pos + len - 1; + + // Find missing blocks for given range + List missingBlockIndexes = blockStore.getMissingBlockIndexesInRange(pos, endPos); + + // Return if all blocks are in store + if (missingBlockIndexes.isEmpty()) return; + + List blocksToFill = new ArrayList<>(); + for (int blockIndex : missingBlockIndexes) { + final Range range = + new Range( + blockIndex * configuration.getReadBufferSize(), + Math.min((blockIndex + 1) * configuration.getReadBufferSize(), getLastObjectByte())); + BlockKey blockKey = new BlockKey(objectKey, range); + DataBlock block = new DataBlock(blockKey, 0); + blockStore.add(block); + blocksToFill.add(block); + } + + streamReader.read(blocksToFill, readMode); + } + + /** + * Retrieves all {@link DataBlock}s that cover the specified byte range {@code [pos, pos + len)}. + * + * @param pos the starting byte position of the desired range (inclusive) + * @param len the number of bytes to include in the range + * @return a list of {@link DataBlock}s that together cover the specified range + */ + public synchronized List getBlocks(long pos, long len) { + // TODO This method assumes that all required blocks are already present in the BlockStore. + // If any block is missing, code will throw exception. We need to handle this case + int startBlockIndex = getPositionIndex(pos); + int endBlockIndex = getPositionIndex(Math.min(pos + len - 1, getLastObjectByte())); + + List blocks = new ArrayList<>(); + for (int index = startBlockIndex; index <= endBlockIndex; index++) { + blocks.add(blockStore.getBlockByIndex(index).get()); + } + return blocks; + } + + private int getPositionIndex(long pos) { + return (int) (pos / this.configuration.getReadBufferSize()); + } + + private long getLastObjectByte() { + return this.metadata.getContentLength() - 1; + } + + /** + * Retrieves the {@link DataBlock} containing the given position, if it exists in the block store. + * + * @param pos the byte position within the object to look up + * @return an {@link Optional} containing the {@link DataBlock} if present; otherwise, {@link + * Optional#empty()} + */ + public synchronized Optional getBlock(long pos) { + return this.blockStore.getBlock(pos); + } + + /** + * Checks whether the {@link DataBlockStore} currently holds any blocks. + * + * @return {@code true} if the block store is empty; {@code false} otherwise + */ + public boolean isBlockStoreEmpty() { + return this.blockStore.isEmpty(); + } + + /** cleans data from memory */ + public void cleanUp() {} + + /** Closes the {@link DataBlockManager} and frees up all resources it holds */ + @Override + public void close() {} +} diff --git a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/DataBlockStore.java b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/DataBlockStore.java new file mode 100644 index 00000000..e6368913 --- /dev/null +++ b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/data/DataBlockStore.java @@ -0,0 +1,137 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package software.amazon.s3.analyticsaccelerator.io.physical.data; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import lombok.NonNull; +import software.amazon.s3.analyticsaccelerator.common.Preconditions; +import software.amazon.s3.analyticsaccelerator.io.physical.PhysicalIOConfiguration; + +/** + * A container that manages a collection of {@link DataBlock} instances. Each {@code DataBlock} + * corresponds to a fixed-size chunk of data based on the configured block size. This class provides + * methods to retrieve, add, and track missing blocks within a specified data range. + */ +public class DataBlockStore implements Closeable { + private final PhysicalIOConfiguration configuration; + // It is safe to use Integer as key since maximum single file size is 5TB in S3 + // and if we assume that block size will be 8KB, total number of blocks is within range + // 5 TB / 8 KB = (5 * 1024^4) / 8192 ≈ 671,088,640 blocks + // Max int value = 2,147,483,647 + private final Map blocks; + + /** + * Creates a new {@link DataBlockStore} with the specified configuration. + * + * @param configuration the {@link PhysicalIOConfiguration} used to define block size and other + * I/O settings + */ + public DataBlockStore(@NonNull PhysicalIOConfiguration configuration) { + this.configuration = configuration; + blocks = new ConcurrentHashMap<>(); + } + + /** + * Retrieves the {@link DataBlock} containing the byte at the specified position, if it exists. + * + * @param pos the byte offset to locate + * @return an {@link Optional} containing the {@code DataBlock} if found, or empty if not present + */ + public Optional getBlock(long pos) { + Preconditions.checkArgument(0 <= pos, "`pos` must not be negative"); + return getBlockByIndex(getPositionIndex(pos)); + } + + /** + * Retrieves the {@link DataBlock} at the specified index from the block store. + * + * @param index the index of the block to retrieve + * @return an {@link Optional} containing the {@link DataBlock} if present; otherwise, an empty + * {@link Optional} + */ + public Optional getBlockByIndex(int index) { + return Optional.ofNullable(blocks.get(index)); + } + + /** + * Adds a new {@link DataBlock} to the store if a block at the corresponding index doesn't already + * exist. + * + * @param block the {@code DataBlock} to add + */ + public void add(DataBlock block) { + this.blocks.putIfAbsent(getBlockIndex(block), block); + } + + /** + * Returns the list of block indexes that are missing for the given byte range. + * + * @param startPos the starting byte position (inclusive) + * @param endPos the ending byte position (inclusive) + * @return a list of missing block indexes within the specified range + */ + public List getMissingBlockIndexesInRange(long startPos, long endPos) { + return getMissingBlockIndexesInRange(getPositionIndex(startPos), getPositionIndex(endPos)); + } + + // TODO Consider using Range, otherwise add Preconditions to check start and end indexes + private List getMissingBlockIndexesInRange(int startIndex, int endIndex) { + List missingBlockIndexes = new ArrayList<>(); + + for (int i = startIndex; i <= endIndex; i++) { + if (!blocks.containsKey(i)) missingBlockIndexes.add(i); + } + return missingBlockIndexes; + } + + private int getBlockIndex(DataBlock block) { + return getPositionIndex(block.getBlockKey().getRange().getStart()); + } + + private int getPositionIndex(long pos) { + return (int) (pos / this.configuration.getReadBufferSize()); + } + + /** + * Closes all {@link DataBlock} instances in the store and clears the internal map. This should be + * called to release any underlying resources or memory. + * + * @throws IOException if an I/O error occurs during block closure + */ + @Override + public void close() throws IOException { + // TODO Memory Manager + for (DataBlock block : blocks.values()) { + block.close(); + } + blocks.clear(); + } + + /** + * Returns true if blockstore is empty + * + * @return true if blockstore is empty + */ + public boolean isEmpty() { + return this.blocks.isEmpty(); + } +} diff --git a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/reader/StreamReader.java b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/reader/StreamReader.java new file mode 100644 index 00000000..09a60acb --- /dev/null +++ b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/io/physical/reader/StreamReader.java @@ -0,0 +1,145 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package software.amazon.s3.analyticsaccelerator.io.physical.reader; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.util.List; +import java.util.concurrent.ExecutorService; +import lombok.NonNull; +import software.amazon.s3.analyticsaccelerator.common.Preconditions; +import software.amazon.s3.analyticsaccelerator.io.physical.data.DataBlock; +import software.amazon.s3.analyticsaccelerator.request.*; +import software.amazon.s3.analyticsaccelerator.util.ObjectKey; + +/** + * {@code StreamReader} is responsible for asynchronously reading a range of bytes from an object in + * S3 and populating the corresponding {@link DataBlock}s with the downloaded data. + * + *

It submits the read task to a provided {@link ExecutorService}, allowing non-blocking + * operation. + */ +public class StreamReader implements Closeable { + private final ObjectClient objectClient; + private final ObjectKey objectKey; + private final ExecutorService threadPool; + private final StreamContext streamContext; + + /** + * Constructs a {@code StreamReader} instance for reading objects from S3. + * + * @param objectClient the client used to fetch S3 object content + * @param objectKey the key identifying the S3 object and its ETag + * @param threadPool an {@link ExecutorService} used for async I/O operations + * @param streamContext the context containing metrics, logging, or monitoring information + */ + public StreamReader( + @NonNull ObjectClient objectClient, + @NonNull ObjectKey objectKey, + @NonNull ExecutorService threadPool, + StreamContext streamContext) { + this.objectClient = objectClient; + this.objectKey = objectKey; + this.threadPool = threadPool; + this.streamContext = streamContext; + } + + /** + * Asynchronously reads a range of bytes from the S3 object and fills the corresponding {@link + * DataBlock}s with data. The byte range is determined by the start of the first block and the end + * of the last block. + * + * @param blocks the list of {@link DataBlock}s to be populated; must not be empty and must be + * sorted by offset + * @param readMode the mode in which the read is being performed (used for tracking or metrics) + * @throws IllegalArgumentException if the {@code blocks} list is empty + * @implNote This method uses a fire-and-forget strategy and doesn't return a {@code Future}; + * failures are logged or wrapped in a {@code RuntimeException}. + */ + @SuppressFBWarnings( + value = "RV_RETURN_VALUE_IGNORED", + justification = "Intentional fire-and-forget task") + public void read(@NonNull final List blocks, ReadMode readMode) { + Preconditions.checkArgument(!blocks.isEmpty(), "`blocks` list must not be empty"); + + long rangeStart = blocks.get(0).getBlockKey().getRange().getStart(); + long rangeEnd = blocks.get(blocks.size() - 1).getBlockKey().getRange().getEnd(); + final Range requestRange = new Range(rangeStart, rangeEnd); + + threadPool.submit( + () -> { + GetRequest getRequest = + GetRequest.builder() + .s3Uri(objectKey.getS3URI()) + .range(requestRange) + .etag(objectKey.getEtag()) + .referrer(new Referrer(requestRange.toHttpString(), readMode)) + .build(); + + ObjectContent objectContent = objectClient.getObject(getRequest, streamContext).join(); + + try (InputStream inputStream = objectContent.getStream()) { + long currentOffset = rangeStart; + for (DataBlock block : blocks) { + long blockStart = block.getBlockKey().getRange().getStart(); + long blockEnd = block.getBlockKey().getRange().getEnd(); + int blockSize = (int) (blockEnd - blockStart + 1); + + // Skip if needed + long skipBytes = blockStart - currentOffset; + if (skipBytes > 0) { + long skipped = inputStream.skip(skipBytes); + if (skipped != skipBytes) { + throw new IOException("Failed to skip required number of bytes in stream"); + } + currentOffset += skipped; + } + + byte[] blockData = new byte[blockSize]; + int totalRead = 0; + while (totalRead < blockSize) { + int bytesRead = inputStream.read(blockData, totalRead, blockSize - totalRead); + if (bytesRead == -1) { + throw new IOException("Unexpected end of stream while reading block data"); + } + totalRead += bytesRead; + } + + block.setData(blockData); + + currentOffset += blockSize; + } + } catch (IOException e) { + // TODO handle failure cases gracefully + throw new RuntimeException("Unexpected error while reading from stream", e); + } + }); + } + + /** + * Closes the underlying {@link ObjectClient} and shuts down the thread pool used for asynchronous + * execution. + * + * @throws IOException if the {@code objectClient} fails to close properly + */ + @Override + public void close() throws IOException { + this.objectClient.close(); + this.threadPool.shutdown(); + } +} diff --git a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/PhysicalIOConfigurationTest.java b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/PhysicalIOConfigurationTest.java index 3a086d47..3fc12f56 100644 --- a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/PhysicalIOConfigurationTest.java +++ b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/PhysicalIOConfigurationTest.java @@ -77,6 +77,7 @@ void testToString() { + "\tblockReadRetryCount: 20\n" + "\tsmallObjectsPrefetchingEnabled: true\n" + "\tsmallObjectSizeThreshold: 8388608\n" - + "\tthreadPoolSize: 96\n"); + + "\tthreadPoolSize: 96\n" + + "\treadBufferSize: 8192\n"); } } diff --git a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/data/DataBlockTest.java b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/data/DataBlockTest.java new file mode 100644 index 00000000..333896e2 --- /dev/null +++ b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/io/physical/data/DataBlockTest.java @@ -0,0 +1,52 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package software.amazon.s3.analyticsaccelerator.io.physical.data; + +import static org.junit.jupiter.api.Assertions.*; + +import java.nio.charset.StandardCharsets; +import org.junit.jupiter.api.Test; +import software.amazon.s3.analyticsaccelerator.request.Range; +import software.amazon.s3.analyticsaccelerator.util.BlockKey; +import software.amazon.s3.analyticsaccelerator.util.ObjectKey; +import software.amazon.s3.analyticsaccelerator.util.S3URI; + +public class DataBlockTest { + private static final S3URI TEST_URI = S3URI.of("foo", "bar"); + private static final String ETAG = "RandomString"; + private static final ObjectKey TEST_OBJECT_KEY = + ObjectKey.builder().s3URI(TEST_URI).etag(ETAG).build(); + private static final byte[] TEST_DATA_BYTES = "test-data".getBytes(StandardCharsets.UTF_8); + + @Test + public void testValidConstructor() { + Range range = new Range(0, 10); + BlockKey blockKey = new BlockKey(TEST_OBJECT_KEY, range); + + DataBlock block = new DataBlock(blockKey, 2); + + assertEquals(block.getBlockKey(), blockKey); + assertEquals(block.getGeneration(), 2); + } + + @Test + void testNegativeGenerationThrows() { + Range range = new Range(0, 10); + BlockKey blockKey = new BlockKey(TEST_OBJECT_KEY, range); + + assertThrows(IllegalArgumentException.class, () -> new DataBlock(blockKey, -1)); + } +}