Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ tasks.named<Test>("test") {
}

tasks.test {
maxHeapSize = "2G"
// Report is generated and verification is run after tests
finalizedBy(tasks.jacocoTestReport, tasks.jacocoTestCoverageVerification)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@

public class EtagChangeTest extends IntegrationTestBase {

private static final int DEFAULT_READ_AHEAD_BYTES = 64 * ONE_KB;
// This value should be Default ReadAhead bytes or read buffer size whichever
// is higher. Currently, default read buffer size is 128KB, this value
// should be 128KB
private static final int MIN_READ_BYTES = 128 * ONE_KB;

@ParameterizedTest
@MethodSource("clientKinds")
Expand Down Expand Up @@ -102,9 +105,7 @@ protected void testChangingEtagMidStream(

// read the next bytes and fail.
IOException ex =
assertThrows(
IOException.class,
() -> readAndAssert(stream, buffer, 200, DEFAULT_READ_AHEAD_BYTES));
assertThrows(IOException.class, () -> readAndAssert(stream, buffer, 200, MIN_READ_BYTES));
S3Exception s3Exception =
assertInstanceOf(S3Exception.class, ex.getCause(), "Cause should be S3Exception");
assertEquals(412, s3Exception.statusCode(), "Expected Precondition Failed (412) status code");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,23 +55,20 @@ void testingSequentialReadPatterBelowPartitionSize(S3ClientKind clientKind, S3Ob
testAndCompareStreamReadPattern(
clientKind, s3Object, streamReadPattern, s3AALClientStreamReader);

// The physicalIO is reading in 8MB chunks. So the total gets should be Math.ceil(file size /
// 8MB).
// For example, for a 20MB, this will be 3, [0-8MB, 8MB - 16MB, 16MB - 20MB].
int expectedGETCount = (int) Math.ceil((double) s3Object.getSize() / (8 * ONE_MB));
// We are expecting to have 2 GET requests. After introducing DEFAULT_REQUEST_TOLERANCE_RATIO
// config in PhysicalIO configuration, we can exceed the request limit up to
// 8MB * DEFAULT_REQUEST_TOLERANCE_RATIO (1.4 default) = up to 11.2MB.
// For example, in this test for 20MB file, we start reading from position 1MB
// which means that we need to read 19MB data. So, we can split the reads into two
// as 8MB and 11MB.

// The sequential prefetcher should download the whole file,
assertEquals(
expectedGETCount,
2,
s3AALClientStreamReader
.getS3SeekableInputStreamFactory()
.getMetrics()
.get(MetricKey.GET_REQUEST_COUNT));

// TODO: This should be fixed with the new PhysicalIO, currently the cache hit metric is
// inaccurate.
// assertEquals(expectedGETCount,
// s3AALClientStreamReader.getS3SeekableInputStreamFactory().getMetrics().get(MetricKey.CACHE_HIT));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why has this been cut?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

due to performance issues we faced we are not publishing CACHE_HIT and CACHE_MISS metrics in new PhysicalIO

}
}

Expand Down Expand Up @@ -111,11 +108,6 @@ void testingSequentialReadPatternAbovePartitionSize(S3ClientKind clientKind) thr
.getS3SeekableInputStreamFactory()
.getMetrics()
.get(MetricKey.GET_REQUEST_COUNT));

// TODO: This should be fixed with the new PhysicalIO, currently the cache hit metric is
// inaccurate.
// assertEquals(2,
// s3AALClientStreamReader.getS3SeekableInputStreamFactory().getMetrics().get(MetricKey.CACHE_HIT));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,17 @@ public S3SeekableInputStreamFactory(
new MetadataStore(
objectClient, telemetry, configuration.getPhysicalIOConfiguration(), metrics);
this.objectFormatSelector = new ObjectFormatSelector(configuration.getLogicalIOConfiguration());
this.objectBlobStore =
new BlobStore(objectClient, telemetry, configuration.getPhysicalIOConfiguration(), metrics);
// TODO: calling applications should be able to pass in a thread pool if they so wish
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets create an issue for this todo

this.threadPool =
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this threadpool be a daemon?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will follow-up in a separate PR

Executors.newFixedThreadPool(
configuration.getPhysicalIOConfiguration().getThreadPoolSize());
this.objectBlobStore =
new BlobStore(
objectClient,
telemetry,
configuration.getPhysicalIOConfiguration(),
metrics,
threadPool);
objectBlobStore.schedulePeriodicCleanup();
}

Expand Down Expand Up @@ -182,5 +187,6 @@ public void close() throws IOException {
this.objectMetadataStore.close();
this.objectBlobStore.close();
this.telemetry.close();
this.threadPool.shutdown();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if this close is never called?

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ public class PhysicalIOConfiguration {
private static final boolean DEFAULT_USE_SINGLE_CACHE = true;
private static final long DEFAULT_BLOCK_SIZE_BYTES = 8 * ONE_MB;
private static final long DEFAULT_READ_AHEAD_BYTES = 64 * ONE_KB;
private static final long DEFAULT_MAX_RANGE_SIZE = 8 * ONE_MB;
private static final long DEFAULT_PART_SIZE = 8 * ONE_MB;
private static final double DEFAULT_SEQUENTIAL_PREFETCH_BASE = 2.0;
private static final double DEFAULT_SEQUENTIAL_PREFETCH_SPEED = 1.0;
private static final long DEFAULT_BLOCK_READ_TIMEOUT = 30_000;
Expand All @@ -47,6 +45,9 @@ public class PhysicalIOConfiguration {
private static final boolean DEFAULT_SMALL_OBJECTS_PREFETCHING_ENABLED = true;
private static final long DEFAULT_SMALL_OBJECT_SIZE_THRESHOLD = 8 * ONE_MB;
private static final int DEFAULT_THREAD_POOL_SIZE = 96;
private static final long DEFAULT_READ_BUFFER_SIZE = 128 * ONE_KB;
private static final long DEFAULT_TARGET_REQUEST_SIZE = 8 * ONE_MB;
private static final double DEFAULT_REQUEST_TOLERANCE_RATIO = 1.4;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we changing the default read buffer size to 128mb? and why request tolerance to 1.4?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are not changing it, we are adding a new configuration and it is the max block size we want to store. With this new design our aim is to make requests as close as to the DEFAULT_TARGET_REQUEST_SIZE. With the tolerance, we have flexibility to exceed the DEFAULT_TARGET_REQUEST_SIZE. For example if there is a 11MB request, we don't want to split it into 8MB + 3MB. We want to have one 11MB request. But if the total request is 12MB, we want to split it into 8MB + 4MB. According to different performance tests, these configurations can be tuned

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this tolerance number of 1.4 is based on some POC's?


/**
* Capacity, in blobs. {@link PhysicalIOConfiguration#DEFAULT_MEMORY_CAPACITY_BYTES} by default.
Expand Down Expand Up @@ -91,19 +92,6 @@ public class PhysicalIOConfiguration {

private static final String READ_AHEAD_BYTES_KEY = "readaheadbytes";

/**
* Maximum range size, in bytes. {@link PhysicalIOConfiguration#DEFAULT_MAX_RANGE_SIZE} by
* default.
*/
@Builder.Default private long maxRangeSizeBytes = DEFAULT_MAX_RANGE_SIZE;

private static final String MAX_RANGE_SIZE_BYTES_KEY = "maxrangesizebytes";

/** Part size, in bytes. {@link PhysicalIOConfiguration#DEFAULT_PART_SIZE} by default. */
@Builder.Default private long partSizeBytes = DEFAULT_PART_SIZE;

private static final String PART_SIZE_BYTES_KEY = "partsizebytes";

/**
* Base constant in the sequential prefetching geometric progression. See {@link
* SequentialReadProgression} for the exact formula. {@link
Expand Down Expand Up @@ -151,6 +139,25 @@ public class PhysicalIOConfiguration {

@Builder.Default private int threadPoolSize = DEFAULT_THREAD_POOL_SIZE;

private static final String READ_BUFFER_SIZE_KEY = "readbuffersize";
@Builder.Default private long readBufferSize = DEFAULT_READ_BUFFER_SIZE;

/**
* Target S3 request size, in bytes. {@link PhysicalIOConfiguration#DEFAULT_TARGET_REQUEST_SIZE}
* by default.
*/
@Builder.Default private long targetRequestSize = DEFAULT_TARGET_REQUEST_SIZE;

private static final String TARGET_REQUEST_SIZE_KEY = "target.request.size";

/**
* Request tolerance ratio. {@link PhysicalIOConfiguration#DEFAULT_REQUEST_TOLERANCE_RATIO} by
* default.
*/
@Builder.Default private double requestToleranceRatio = DEFAULT_REQUEST_TOLERANCE_RATIO;

private static final String REQUEST_TOLERANCE_RATIO_KEY = "request.tolerance.ratio";

/** Default set of settings for {@link PhysicalIO} */
public static final PhysicalIOConfiguration DEFAULT = PhysicalIOConfiguration.builder().build();

Expand All @@ -175,8 +182,6 @@ public static PhysicalIOConfiguration fromConfiguration(ConnectorConfiguration c
configuration.getInt(METADATA_STORE_CAPACITY_KEY, DEFAULT_CAPACITY_METADATA_STORE))
.blockSizeBytes(configuration.getLong(BLOCK_SIZE_BYTES_KEY, DEFAULT_BLOCK_SIZE_BYTES))
.readAheadBytes(configuration.getLong(READ_AHEAD_BYTES_KEY, DEFAULT_READ_AHEAD_BYTES))
.maxRangeSizeBytes(configuration.getLong(MAX_RANGE_SIZE_BYTES_KEY, DEFAULT_MAX_RANGE_SIZE))
.partSizeBytes(configuration.getLong(PART_SIZE_BYTES_KEY, DEFAULT_PART_SIZE))
.sequentialPrefetchBase(
configuration.getDouble(SEQUENTIAL_PREFETCH_BASE_KEY, DEFAULT_SEQUENTIAL_PREFETCH_BASE))
.sequentialPrefetchSpeed(
Expand All @@ -192,6 +197,11 @@ public static PhysicalIOConfiguration fromConfiguration(ConnectorConfiguration c
configuration.getLong(
SMALL_OBJECT_SIZE_THRESHOLD_KEY, DEFAULT_SMALL_OBJECT_SIZE_THRESHOLD))
.threadPoolSize(configuration.getInt(THREAD_POOL_SIZE_KEY, DEFAULT_THREAD_POOL_SIZE))
.readBufferSize(configuration.getLong(READ_BUFFER_SIZE_KEY, DEFAULT_READ_BUFFER_SIZE))
.targetRequestSize(
configuration.getLong(TARGET_REQUEST_SIZE_KEY, DEFAULT_TARGET_REQUEST_SIZE))
.requestToleranceRatio(
configuration.getDouble(REQUEST_TOLERANCE_RATIO_KEY, DEFAULT_REQUEST_TOLERANCE_RATIO))
.build();
}

Expand All @@ -204,8 +214,6 @@ public static PhysicalIOConfiguration fromConfiguration(ConnectorConfiguration c
* @param metadataStoreCapacity The capacity of the MetadataStore
* @param blockSizeBytes Block size, in bytes
* @param readAheadBytes Read ahead, in bytes
* @param maxRangeSizeBytes Maximum physical read issued against the object store
* @param partSizeBytes What part size to use when splitting up logical reads
* @param sequentialPrefetchBase Scale factor to control the size of sequentially prefetched
* physical blocks. Example: A constant of 2.0 means doubling the block sizes.
* @param sequentialPrefetchSpeed Constant controlling the rate of growth of sequentially
Expand All @@ -215,6 +223,9 @@ public static PhysicalIOConfiguration fromConfiguration(ConnectorConfiguration c
* @param smallObjectsPrefetchingEnabled Whether small object prefetching is enabled
* @param smallObjectSizeThreshold Maximum size in bytes for an object to be considered small
* @param threadPoolSize Size of thread pool to be used for async operations
* @param readBufferSize Size of the maximum buffer for read operations
* @param targetRequestSize Target S3 request size, in bytes
* @param requestToleranceRatio Request tolerance ratio
*/
@Builder
private PhysicalIOConfiguration(
Expand All @@ -224,15 +235,16 @@ private PhysicalIOConfiguration(
int metadataStoreCapacity,
long blockSizeBytes,
long readAheadBytes,
long maxRangeSizeBytes,
long partSizeBytes,
double sequentialPrefetchBase,
double sequentialPrefetchSpeed,
long blockReadTimeout,
int blockReadRetryCount,
boolean smallObjectsPrefetchingEnabled,
long smallObjectSizeThreshold,
int threadPoolSize) {
int threadPoolSize,
long readBufferSize,
long targetRequestSize,
double requestToleranceRatio) {
Preconditions.checkArgument(memoryCapacityBytes > 0, "`memoryCapacityBytes` must be positive");
Preconditions.checkArgument(
memoryCleanupFrequencyMilliseconds > 0,
Expand All @@ -243,8 +255,6 @@ private PhysicalIOConfiguration(
metadataStoreCapacity > 0, "`metadataStoreCapacity` must be positive");
Preconditions.checkArgument(blockSizeBytes > 0, "`blockSizeBytes` must be positive");
Preconditions.checkArgument(readAheadBytes > 0, "`readAheadLengthBytes` must be positive");
Preconditions.checkArgument(maxRangeSizeBytes > 0, "`maxRangeSize` must be positive");
Preconditions.checkArgument(partSizeBytes > 0, "`partSize` must be positive");
Preconditions.checkArgument(
sequentialPrefetchBase > 0, "`sequentialPrefetchBase` must be positive");
Preconditions.checkArgument(
Expand All @@ -255,22 +265,27 @@ private PhysicalIOConfiguration(
Preconditions.checkArgument(
smallObjectSizeThreshold > 0, "`smallObjectSizeThreshold` must be positive");
Preconditions.checkNotNull(threadPoolSize > 0, "`threadPoolSize` must be positive");
Preconditions.checkArgument(readBufferSize > 0, "`readBufferSize` must be positive");
Preconditions.checkArgument(targetRequestSize > 0, "`targetRequestSize` must be positive");
Preconditions.checkArgument(
requestToleranceRatio >= 1, "`requestToleranceRatio` must be greater than or equal than 1");

this.memoryCapacityBytes = memoryCapacityBytes;
this.memoryCleanupFrequencyMilliseconds = memoryCleanupFrequencyMilliseconds;
this.cacheDataTimeoutMilliseconds = cacheDataTimeoutMilliseconds;
this.metadataStoreCapacity = metadataStoreCapacity;
this.blockSizeBytes = blockSizeBytes;
this.readAheadBytes = readAheadBytes;
this.maxRangeSizeBytes = maxRangeSizeBytes;
this.partSizeBytes = partSizeBytes;
this.sequentialPrefetchBase = sequentialPrefetchBase;
this.sequentialPrefetchSpeed = sequentialPrefetchSpeed;
this.blockReadTimeout = blockReadTimeout;
this.blockReadRetryCount = blockReadRetryCount;
this.smallObjectsPrefetchingEnabled = smallObjectsPrefetchingEnabled;
this.smallObjectSizeThreshold = smallObjectSizeThreshold;
this.threadPoolSize = threadPoolSize;
this.readBufferSize = readBufferSize;
this.targetRequestSize = targetRequestSize;
this.requestToleranceRatio = requestToleranceRatio;
}

@Override
Expand All @@ -285,15 +300,16 @@ public String toString() {
builder.append("\tmetadataStoreCapacity: " + metadataStoreCapacity + "\n");
builder.append("\tblockSizeBytes: " + blockSizeBytes + "\n");
builder.append("\treadAheadBytes: " + readAheadBytes + "\n");
builder.append("\tmaxRangeSizeBytes: " + maxRangeSizeBytes + "\n");
builder.append("\tpartSizeBytes: " + partSizeBytes + "\n");
builder.append("\tsequentialPrefetchBase: " + sequentialPrefetchBase + "\n");
builder.append("\tsequentialPrefetchSpeed: " + sequentialPrefetchSpeed + "\n");
builder.append("\tblockReadTimeout: " + blockReadTimeout + "\n");
builder.append("\tblockReadRetryCount: " + blockReadRetryCount + "\n");
builder.append("\tsmallObjectsPrefetchingEnabled: " + smallObjectsPrefetchingEnabled + "\n");
builder.append("\tsmallObjectSizeThreshold: " + smallObjectSizeThreshold + "\n");
builder.append("\tthreadPoolSize: " + threadPoolSize + "\n");
builder.append("\treadBufferSize: " + readBufferSize + "\n");
builder.append("\ttargetRequestSize: " + targetRequestSize + "\n");
builder.append("\trequestToleranceRatio: " + requestToleranceRatio + "\n");

return builder.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import lombok.NonNull;
import org.slf4j.Logger;
Expand Down Expand Up @@ -94,7 +95,15 @@ public int read(long pos) throws IOException {
try {
lock.readLock().lock();
blockManager.makePositionAvailable(pos, ReadMode.SYNC);
return blockManager.getBlock(pos).get().read(pos);
Optional<Block> block = blockManager.getBlock(pos);
Comment thread
stubz151 marked this conversation as resolved.
return block
.orElseThrow(
() ->
new IllegalStateException(
String.format(
"This block object key %s (for position %s) should have been available.",
objectKey.getS3URI(), pos)))
.read(pos);
} finally {
lock.readLock().unlock();
}
Expand Down Expand Up @@ -126,6 +135,7 @@ public int read(byte[] buf, int off, int len, long pos) throws IOException {

while (numBytesRead < len && nextPosition < contentLength()) {
final long nextPositionFinal = nextPosition;

Block nextBlock =
blockManager
.getBlock(nextPosition)
Expand All @@ -134,7 +144,7 @@ public int read(byte[] buf, int off, int len, long pos) throws IOException {
new IllegalStateException(
String.format(
"This block object key %s (for position %s) should have been available.",
objectKey.getS3URI().toString(), nextPositionFinal)));
objectKey.getS3URI(), nextPositionFinal)));
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have a toString override on S3URI now?

Copy link
Copy Markdown
Collaborator Author

@ozkoca ozkoca Jul 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, S3URI has already a toString() method.


int bytesRead = nextBlock.read(buf, off + numBytesRead, len - numBytesRead, nextPosition);

Expand Down
Loading
Loading