Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ tasks.named<Test>("test") {
}

tasks.test {
maxHeapSize = "2G"
// Report is generated and verification is run after tests
finalizedBy(tasks.jacocoTestReport, tasks.jacocoTestCoverageVerification)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,17 @@ public S3SeekableInputStreamFactory(
new MetadataStore(
objectClient, telemetry, configuration.getPhysicalIOConfiguration(), metrics);
this.objectFormatSelector = new ObjectFormatSelector(configuration.getLogicalIOConfiguration());
this.objectBlobStore =
new BlobStore(objectClient, telemetry, configuration.getPhysicalIOConfiguration(), metrics);
// TODO: calling applications should be able to pass in a thread pool if they so wish
this.threadPool =
Executors.newFixedThreadPool(
configuration.getPhysicalIOConfiguration().getThreadPoolSize());
this.objectBlobStore =
new BlobStore(
objectClient,
telemetry,
configuration.getPhysicalIOConfiguration(),
metrics,
threadPool);
objectBlobStore.schedulePeriodicCleanup();
}

Expand Down Expand Up @@ -195,5 +200,6 @@ public void close() throws IOException {
this.objectMetadataStore.close();
this.objectBlobStore.close();
this.telemetry.close();
this.threadPool.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ public class PhysicalIOConfiguration {
private static final boolean DEFAULT_USE_SINGLE_CACHE = true;
private static final long DEFAULT_BLOCK_SIZE_BYTES = 8 * ONE_MB;
private static final long DEFAULT_READ_AHEAD_BYTES = 64 * ONE_KB;
private static final long DEFAULT_MAX_RANGE_SIZE = 8 * ONE_MB;
private static final long DEFAULT_PART_SIZE = 8 * ONE_MB;
private static final double DEFAULT_SEQUENTIAL_PREFETCH_BASE = 2.0;
private static final double DEFAULT_SEQUENTIAL_PREFETCH_SPEED = 1.0;
private static final long DEFAULT_BLOCK_READ_TIMEOUT = 30_000;
Expand All @@ -47,6 +45,9 @@ public class PhysicalIOConfiguration {
private static final boolean DEFAULT_SMALL_OBJECTS_PREFETCHING_ENABLED = true;
private static final long DEFAULT_SMALL_OBJECT_SIZE_THRESHOLD = 8 * ONE_MB;
private static final int DEFAULT_THREAD_POOL_SIZE = 96;
private static final long DEFAULT_READ_BUFFER_SIZE = 8 * ONE_KB;
private static final long DEFAULT_TARGET_REQUEST_SIZE = 8 * ONE_MB;
private static final double DEFAULT_REQUEST_TOLERANCE_RATIO = 1.4;

/**
* Capacity, in blobs. {@link PhysicalIOConfiguration#DEFAULT_MEMORY_CAPACITY_BYTES} by default.
Expand Down Expand Up @@ -91,19 +92,6 @@ public class PhysicalIOConfiguration {

private static final String READ_AHEAD_BYTES_KEY = "readaheadbytes";

/**
* Maximum range size, in bytes. {@link PhysicalIOConfiguration#DEFAULT_MAX_RANGE_SIZE} by
* default.
*/
@Builder.Default private long maxRangeSizeBytes = DEFAULT_MAX_RANGE_SIZE;

private static final String MAX_RANGE_SIZE_BYTES_KEY = "maxrangesizebytes";

/** Part size, in bytes. {@link PhysicalIOConfiguration#DEFAULT_PART_SIZE} by default. */
@Builder.Default private long partSizeBytes = DEFAULT_PART_SIZE;

private static final String PART_SIZE_BYTES_KEY = "partsizebytes";

/**
* Base constant in the sequential prefetching geometric progression. See {@link
* SequentialReadProgression} for the exact formula. {@link
Expand Down Expand Up @@ -151,6 +139,25 @@ public class PhysicalIOConfiguration {

@Builder.Default private int threadPoolSize = DEFAULT_THREAD_POOL_SIZE;

private static final String READ_BUFFER_SIZE_KEY = "readbuffersize";
@Builder.Default private long readBufferSize = DEFAULT_READ_BUFFER_SIZE;

/**
* Target S3 request size, in bytes. {@link PhysicalIOConfiguration#DEFAULT_TARGET_REQUEST_SIZE}
* by default.
*/
@Builder.Default private long targetRequestSize = DEFAULT_TARGET_REQUEST_SIZE;

private static final String TARGET_REQUEST_SIZE_KEY = "target.request.size";

/**
* Request tolerance ratio. {@link PhysicalIOConfiguration#DEFAULT_REQUEST_TOLERANCE_RATIO} by
* default.
*/
@Builder.Default private double requestToleranceRatio = DEFAULT_REQUEST_TOLERANCE_RATIO;

private static final String REQUEST_TOLERANCE_RATIO_KEY = "request.tolerance.ratio";

/** Default set of settings for {@link PhysicalIO} */
public static final PhysicalIOConfiguration DEFAULT = PhysicalIOConfiguration.builder().build();

Expand All @@ -175,8 +182,6 @@ public static PhysicalIOConfiguration fromConfiguration(ConnectorConfiguration c
configuration.getInt(METADATA_STORE_CAPACITY_KEY, DEFAULT_CAPACITY_METADATA_STORE))
.blockSizeBytes(configuration.getLong(BLOCK_SIZE_BYTES_KEY, DEFAULT_BLOCK_SIZE_BYTES))
.readAheadBytes(configuration.getLong(READ_AHEAD_BYTES_KEY, DEFAULT_READ_AHEAD_BYTES))
.maxRangeSizeBytes(configuration.getLong(MAX_RANGE_SIZE_BYTES_KEY, DEFAULT_MAX_RANGE_SIZE))
.partSizeBytes(configuration.getLong(PART_SIZE_BYTES_KEY, DEFAULT_PART_SIZE))
.sequentialPrefetchBase(
configuration.getDouble(SEQUENTIAL_PREFETCH_BASE_KEY, DEFAULT_SEQUENTIAL_PREFETCH_BASE))
.sequentialPrefetchSpeed(
Expand All @@ -192,6 +197,11 @@ public static PhysicalIOConfiguration fromConfiguration(ConnectorConfiguration c
configuration.getLong(
SMALL_OBJECT_SIZE_THRESHOLD_KEY, DEFAULT_SMALL_OBJECT_SIZE_THRESHOLD))
.threadPoolSize(configuration.getInt(THREAD_POOL_SIZE_KEY, DEFAULT_THREAD_POOL_SIZE))
.readBufferSize(configuration.getLong(READ_BUFFER_SIZE_KEY, DEFAULT_READ_BUFFER_SIZE))
.targetRequestSize(
configuration.getLong(TARGET_REQUEST_SIZE_KEY, DEFAULT_TARGET_REQUEST_SIZE))
.requestToleranceRatio(
configuration.getDouble(REQUEST_TOLERANCE_RATIO_KEY, DEFAULT_REQUEST_TOLERANCE_RATIO))
.build();
}

Expand All @@ -204,8 +214,6 @@ public static PhysicalIOConfiguration fromConfiguration(ConnectorConfiguration c
* @param metadataStoreCapacity The capacity of the MetadataStore
* @param blockSizeBytes Block size, in bytes
* @param readAheadBytes Read ahead, in bytes
* @param maxRangeSizeBytes Maximum physical read issued against the object store
* @param partSizeBytes What part size to use when splitting up logical reads
* @param sequentialPrefetchBase Scale factor to control the size of sequentially prefetched
* physical blocks. Example: A constant of 2.0 means doubling the block sizes.
* @param sequentialPrefetchSpeed Constant controlling the rate of growth of sequentially
Expand All @@ -215,6 +223,9 @@ public static PhysicalIOConfiguration fromConfiguration(ConnectorConfiguration c
* @param smallObjectsPrefetchingEnabled Whether small object prefetching is enabled
* @param smallObjectSizeThreshold Maximum size in bytes for an object to be considered small
* @param threadPoolSize Size of thread pool to be used for async operations
* @param readBufferSize Size of the maximum buffer for read operations
* @param targetRequestSize Target S3 request size, in bytes
* @param requestToleranceRatio Request tolerance ratio
*/
@Builder
private PhysicalIOConfiguration(
Expand All @@ -224,15 +235,16 @@ private PhysicalIOConfiguration(
int metadataStoreCapacity,
long blockSizeBytes,
long readAheadBytes,
long maxRangeSizeBytes,
long partSizeBytes,
double sequentialPrefetchBase,
double sequentialPrefetchSpeed,
long blockReadTimeout,
int blockReadRetryCount,
boolean smallObjectsPrefetchingEnabled,
long smallObjectSizeThreshold,
int threadPoolSize) {
int threadPoolSize,
long readBufferSize,
long targetRequestSize,
double requestToleranceRatio) {
Preconditions.checkArgument(memoryCapacityBytes > 0, "`memoryCapacityBytes` must be positive");
Preconditions.checkArgument(
memoryCleanupFrequencyMilliseconds > 0,
Expand All @@ -243,8 +255,6 @@ private PhysicalIOConfiguration(
metadataStoreCapacity > 0, "`metadataStoreCapacity` must be positive");
Preconditions.checkArgument(blockSizeBytes > 0, "`blockSizeBytes` must be positive");
Preconditions.checkArgument(readAheadBytes > 0, "`readAheadLengthBytes` must be positive");
Preconditions.checkArgument(maxRangeSizeBytes > 0, "`maxRangeSize` must be positive");
Preconditions.checkArgument(partSizeBytes > 0, "`partSize` must be positive");
Preconditions.checkArgument(
sequentialPrefetchBase > 0, "`sequentialPrefetchBase` must be positive");
Preconditions.checkArgument(
Expand All @@ -254,22 +264,27 @@ private PhysicalIOConfiguration(
Preconditions.checkArgument(
smallObjectSizeThreshold > 0, "`smallObjectSizeThreshold` must be positive");
Preconditions.checkNotNull(threadPoolSize > 0, "`threadPoolSize` must be positive");
Preconditions.checkArgument(readBufferSize > 0, "`readBufferSize` must be positive");
Preconditions.checkArgument(targetRequestSize > 0, "`targetRequestSize` must be positive");
Preconditions.checkArgument(
requestToleranceRatio >= 1, "`requestToleranceRatio` must be greater or equal than 1");

this.memoryCapacityBytes = memoryCapacityBytes;
this.memoryCleanupFrequencyMilliseconds = memoryCleanupFrequencyMilliseconds;
this.cacheDataTimeoutMilliseconds = cacheDataTimeoutMilliseconds;
this.metadataStoreCapacity = metadataStoreCapacity;
this.blockSizeBytes = blockSizeBytes;
this.readAheadBytes = readAheadBytes;
this.maxRangeSizeBytes = maxRangeSizeBytes;
this.partSizeBytes = partSizeBytes;
this.sequentialPrefetchBase = sequentialPrefetchBase;
this.sequentialPrefetchSpeed = sequentialPrefetchSpeed;
this.blockReadTimeout = blockReadTimeout;
this.blockReadRetryCount = blockReadRetryCount;
this.smallObjectsPrefetchingEnabled = smallObjectsPrefetchingEnabled;
this.smallObjectSizeThreshold = smallObjectSizeThreshold;
this.threadPoolSize = threadPoolSize;
this.readBufferSize = readBufferSize;
this.targetRequestSize = targetRequestSize;
this.requestToleranceRatio = requestToleranceRatio;
}

@Override
Expand All @@ -284,15 +299,16 @@ public String toString() {
builder.append("\tmetadataStoreCapacity: " + metadataStoreCapacity + "\n");
builder.append("\tblockSizeBytes: " + blockSizeBytes + "\n");
builder.append("\treadAheadBytes: " + readAheadBytes + "\n");
builder.append("\tmaxRangeSizeBytes: " + maxRangeSizeBytes + "\n");
builder.append("\tpartSizeBytes: " + partSizeBytes + "\n");
builder.append("\tsequentialPrefetchBase: " + sequentialPrefetchBase + "\n");
builder.append("\tsequentialPrefetchSpeed: " + sequentialPrefetchSpeed + "\n");
builder.append("\tblockReadTimeout: " + blockReadTimeout + "\n");
builder.append("\tblockReadRetryCount: " + blockReadRetryCount + "\n");
builder.append("\tsmallObjectsPrefetchingEnabled: " + smallObjectsPrefetchingEnabled + "\n");
builder.append("\tsmallObjectSizeThreshold: " + smallObjectSizeThreshold + "\n");
builder.append("\tthreadPoolSize: " + threadPoolSize + "\n");
builder.append("\treadBufferSize: " + readBufferSize + "\n");
builder.append("\ttargetRequestSize: " + targetRequestSize + "\n");
builder.append("\trequestToleranceRatio: " + requestToleranceRatio + "\n");

return builder.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import lombok.NonNull;
import org.slf4j.Logger;
Expand Down Expand Up @@ -94,7 +95,15 @@ public int read(long pos) throws IOException {
try {
lock.readLock().lock();
blockManager.makePositionAvailable(pos, ReadMode.SYNC);
return blockManager.getBlock(pos).get().read(pos);
Optional<Block> block = blockManager.getBlock(pos);
if (!block.isPresent()) {
throw new IllegalStateException(
String.format(
"This block object key %s (for position %s) should have been available.",
objectKey.getS3URI(), pos));
}

return block.get().read(pos);
} finally {
lock.readLock().unlock();
}
Expand Down Expand Up @@ -126,6 +135,7 @@ public int read(byte[] buf, int off, int len, long pos) throws IOException {

while (numBytesRead < len && nextPosition < contentLength()) {
final long nextPositionFinal = nextPosition;

Block nextBlock =
blockManager
.getBlock(nextPosition)
Expand All @@ -134,7 +144,7 @@ public int read(byte[] buf, int off, int len, long pos) throws IOException {
new IllegalStateException(
String.format(
"This block object key %s (for position %s) should have been available.",
objectKey.getS3URI().toString(), nextPositionFinal)));
objectKey.getS3URI(), nextPositionFinal)));

int bytesRead = nextBlock.read(buf, off + numBytesRead, len - numBytesRead, nextPosition);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand All @@ -33,7 +34,6 @@
import software.amazon.s3.analyticsaccelerator.io.physical.PhysicalIOConfiguration;
import software.amazon.s3.analyticsaccelerator.request.ObjectClient;
import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata;
import software.amazon.s3.analyticsaccelerator.util.MetricComputationUtils;
import software.amazon.s3.analyticsaccelerator.util.MetricKey;
import software.amazon.s3.analyticsaccelerator.util.ObjectKey;
import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation;
Expand All @@ -48,6 +48,7 @@ public class BlobStore implements Closeable {
private final ObjectClient objectClient;
private final Telemetry telemetry;
private final PhysicalIOConfiguration configuration;
private final ExecutorService threadPool;

@Getter private final Metrics metrics;
final BlobStoreIndexCache indexCache;
Expand All @@ -62,12 +63,14 @@ public class BlobStore implements Closeable {
* @param telemetry an instance of {@link Telemetry} to use
* @param configuration the PhysicalIO configuration
* @param metrics an instance of {@link Metrics} to track metrics across the factory
* @param threadPool a thread pool for async operations
*/
public BlobStore(
@NonNull ObjectClient objectClient,
@NonNull Telemetry telemetry,
@NonNull PhysicalIOConfiguration configuration,
@NonNull Metrics metrics) {
@NonNull Metrics metrics,
@NonNull ExecutorService threadPool) {
this.objectClient = objectClient;
this.telemetry = telemetry;
this.metrics = metrics;
Expand All @@ -82,6 +85,7 @@ public BlobStore(
return cleanupThread;
});
this.configuration = configuration;
this.threadPool = threadPool;
}

/** Schedules a periodic cleanup task to sync the blop map with the index cache */
Expand Down Expand Up @@ -132,14 +136,15 @@ public Blob get(
uri,
metadata,
new BlockManager(
uri,
objectKey,
objectClient,
metadata,
telemetry,
configuration,
metrics,
indexCache,
openStreamInformation),
openStreamInformation,
threadPool),
telemetry));
}

Expand Down Expand Up @@ -169,11 +174,6 @@ public void close() {
}
blobMap.forEach((k, v) -> v.close());
indexCache.cleanUp();
long hits = metrics.get(MetricKey.CACHE_HIT);
long miss = metrics.get(MetricKey.CACHE_MISS);
LOG.debug(
"Cache Hits: {}, Misses: {}, Hit Rate: {}%",
hits, miss, MetricComputationUtils.computeCacheHitRate(hits, miss));
} catch (Exception e) {
LOG.error("Error while closing BlobStore", e);
}
Expand Down
Loading
Loading