Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,10 @@ public ObjectRange(CompletableFuture<ByteBuffer> byteBuffer, long offset, int le
this.offset = offset;
this.length = length;
}

@Override
public String toString() {
return String.format(
"offset: %d, length: %d, completable-future: %s", offset, length, byteBuffer.toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
* like "bytes=0-555" -- this is SDK detail we should not care about in layers above Object Client.
*/
@Value
public class Range {
public class Range implements Comparable<Range> {
@Getter long start;
@Getter long end;

Expand Down Expand Up @@ -85,4 +85,15 @@ public String toString() {
public String toHttpString() {
return String.format(TO_HTTP_STRING_FORMAT, start, end);
}

/**
* Allows sorting ranges based on their start position.
*
* @param other
* @return
*/
@Override
public int compareTo(Range other) {
return Long.compare(this.start, other.start);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,18 @@
*/
@AllArgsConstructor
public enum ReadMode {
SYNC(true),
ASYNC(true),
SMALL_OBJECT_PREFETCH(true),
SEQUENTIAL_FILE_PREFETCH(true),
DICTIONARY_PREFETCH(false),
COLUMN_PREFETCH(false),
REMAINING_COLUMN_PREFETCH(false),
PREFETCH_TAIL(false),
READ_VECTORED(false);
SYNC(true, false),
ASYNC(true, false),
SMALL_OBJECT_PREFETCH(true, false),
SEQUENTIAL_FILE_PREFETCH(true, false),
DICTIONARY_PREFETCH(false, false),
COLUMN_PREFETCH(false, true),
REMAINING_COLUMN_PREFETCH(false, false),
PREFETCH_TAIL(false, false),
READ_VECTORED(false, true);

private final boolean allowRequestExtension;
private final boolean coalesceRequests;

/**
* Should requests be extended for this read mode?
Expand All @@ -46,4 +47,17 @@ public enum ReadMode {
public boolean allowRequestExtension() {
return allowRequestExtension;
}

/**
* Should requests be coalesced for this read mode?
*
* <p>When the read is from the parquet prefetcher or readVectored(), we know the exact ranges we
* want to read, so in this case don't extend the ranges. Yet if we are reading consecutive
* columns we want to merge requests into one.
*
* @return true if requests should be coalesced
*/
public boolean coalesceRequests() {
return coalesceRequests;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,45 @@ void testGetRequiredStringThrowsIfNotSet() {
IllegalArgumentException.class, () -> configuration.getRequiredString("stringConfig1"));
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we squash some of these tests down using parameterized tests?

@Test
void testGetPositiveLong() {
ConnectorConfiguration configuration = getDefaultConfiguration(TEST_PREFIX);
long positiveLong = configuration.getPositiveLong("longConfig", 5);
assertEquals(1, positiveLong);
}

@Test
void testGetPositiveLongWithDefault() {
ConnectorConfiguration configuration = getDefaultConfiguration(TEST_PREFIX);
long positiveLong = configuration.getPositiveLong("nonExistentKey", 10);
assertEquals(10, positiveLong);
}

@Test
void testGetPositiveLongThrowsOnZero() {
Map<String, String> configMap = new HashMap<>();
configMap.put(TEST_PREFIX + ".zeroConfig", "0");
ConnectorConfiguration configuration = new ConnectorConfiguration(configMap, TEST_PREFIX);
assertThrows(
IllegalArgumentException.class, () -> configuration.getPositiveLong("zeroConfig", 5));
}

@Test
void testGetPositiveLongThrowsOnNegative() {
Map<String, String> configMap = new HashMap<>();
configMap.put(TEST_PREFIX + ".negativeConfig", "-5");
ConnectorConfiguration configuration = new ConnectorConfiguration(configMap, TEST_PREFIX);
assertThrows(
IllegalArgumentException.class, () -> configuration.getPositiveLong("negativeConfig", 5));
}

@Test
void testGetPositiveLongThrowsOnNegativeDefault() {
ConnectorConfiguration configuration = getDefaultConfiguration(TEST_PREFIX);
assertThrows(
IllegalArgumentException.class, () -> configuration.getPositiveLong("nonExistentKey", -1));
}

private static ConnectorConfiguration getDefaultConfiguration(String prefix) {

return new ConnectorConfiguration(getDefaultConfigurationMap(prefix), prefix);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,26 +48,33 @@ public class ReadVectoredTest extends IntegrationTestBase {

@ParameterizedTest
@MethodSource("vectoredReads")
void testVectoredReads(S3ClientKind s3ClientKind, IntFunction<ByteBuffer> allocate)
void testVectoredReads(
S3ClientKind s3ClientKind,
IntFunction<ByteBuffer> allocate,
AALInputStreamConfigurationKind configurationKind)
throws IOException {
testReadVectored(
s3ClientKind, S3Object.RANDOM_1GB, AALInputStreamConfigurationKind.DEFAULT, allocate);
testReadVectored(s3ClientKind, S3Object.RANDOM_1GB, configurationKind, allocate);
}

@ParameterizedTest
@MethodSource("vectoredReads")
void testVectoredReadsInSingleBlock(S3ClientKind s3ClientKind, IntFunction<ByteBuffer> allocate)
void testVectoredReadsInSingleBlock(
S3ClientKind s3ClientKind,
IntFunction<ByteBuffer> allocate,
AALInputStreamConfigurationKind configurationKind)
throws IOException {
testReadVectoredInSingleBlock(
s3ClientKind, S3Object.RANDOM_1GB, AALInputStreamConfigurationKind.DEFAULT, allocate);
testReadVectoredInSingleBlock(s3ClientKind, S3Object.RANDOM_1GB, configurationKind, allocate);
}

@ParameterizedTest
@MethodSource("vectoredReads")
void testVectoredReadsForSequentialRanges(
S3ClientKind s3ClientKind, IntFunction<ByteBuffer> allocate) throws IOException {
S3ClientKind s3ClientKind,
IntFunction<ByteBuffer> allocate,
AALInputStreamConfigurationKind configurationKind)
throws IOException {
testReadVectoredForSequentialRanges(
s3ClientKind, S3Object.RANDOM_1GB, AALInputStreamConfigurationKind.DEFAULT, allocate);
s3ClientKind, S3Object.RANDOM_1GB, configurationKind, allocate);
}

@Test
Expand Down Expand Up @@ -281,13 +288,18 @@ static Stream<Arguments> vectoredReads() {
s3ClientKinds.add(S3ClientKind.SDK_V2_JAVA_ASYNC);
s3ClientKinds.add(S3ClientKind.SDK_V2_JAVA_SYNC);

List<AALInputStreamConfigurationKind> configurationKinds = new ArrayList<>();
configurationKinds.add(AALInputStreamConfigurationKind.DEFAULT);
configurationKinds.add(AALInputStreamConfigurationKind.NO_REQUEST_COALESCING);

List<IntFunction<ByteBuffer>> allocate = new ArrayList<>();
allocate.add(ByteBuffer::allocate);
allocate.add(ByteBuffer::allocateDirect);

for (S3ClientKind s3ClientKind : s3ClientKinds) {
for (IntFunction<ByteBuffer> allocator : allocate) {
testCases.add(Arguments.of(s3ClientKind, allocator));
for (AALInputStreamConfigurationKind configurationKind : configurationKinds)
testCases.add(Arguments.of(s3ClientKind, allocator, configurationKind));
}
}

Expand All @@ -300,26 +312,26 @@ static Stream<Arguments> vectoredReads() {
*
* @param s3ClientKind S3 client kind to use
* @param s3Object S3 object to read
* @param AALInputStreamConfigurationKind configuration kind
* @param configurationKind configuration kind
* @param allocate method to allocate the buffer, can be direct or non-direct
* @throws IOException on any IOException
*/
protected void testReadVectored(
@NonNull S3ClientKind s3ClientKind,
@NonNull S3Object s3Object,
@NonNull AALInputStreamConfigurationKind AALInputStreamConfigurationKind,
@NonNull AALInputStreamConfigurationKind configurationKind,
@NonNull IntFunction<ByteBuffer> allocate)
throws IOException {

try (S3AALClientStreamReader s3AALClientStreamReader =
getStreamReader(s3ClientKind, AALInputStreamConfigurationKind)) {
getStreamReader(s3ClientKind, configurationKind)) {

S3SeekableInputStream s3SeekableInputStream =
s3AALClientStreamReader.createReadStream(s3Object, OpenStreamInformation.ofDefaults());

List<ObjectRange> objectRanges = new ArrayList<>();
objectRanges.add(new ObjectRange(new CompletableFuture<>(), 50, ONE_MB));
objectRanges.add(new ObjectRange(new CompletableFuture<>(), 2 * ONE_MB, 800));
objectRanges.add(new ObjectRange(new CompletableFuture<>(), 3 * ONE_MB, 800));

// a range that should be within a single block
objectRanges.add(new ObjectRange(new CompletableFuture<>(), 200 * ONE_MB, 8 * ONE_MB));
Expand All @@ -340,7 +352,8 @@ protected void testReadVectored(
objectRange.getByteBuffer().join();
}

// Range [50MB - 51MB, 2MB - 2.8MB] will make 2 GET requests
// Range [50 - 1MB, 3MB - 3.8MB] will make 2 GET requests -- will not be coalesced with 1MB
// tolerance
// Range [200MB - 208MB] will make a single GET as it is an 8MB block.
// Range [260MB - 284MB] will make 3 GET requests
assertEquals(
Expand All @@ -357,12 +370,12 @@ protected void testReadVectored(
protected void testReadVectoredInSingleBlock(
@NonNull S3ClientKind s3ClientKind,
@NonNull S3Object s3Object,
@NonNull AALInputStreamConfigurationKind AALInputStreamConfigurationKind,
@NonNull AALInputStreamConfigurationKind configurationKind,
@NonNull IntFunction<ByteBuffer> allocate)
throws IOException {

try (S3AALClientStreamReader s3AALClientStreamReader =
this.getStreamReader(s3ClientKind, AALInputStreamConfigurationKind)) {
this.getStreamReader(s3ClientKind, configurationKind)) {

S3SeekableInputStream s3SeekableInputStream =
s3AALClientStreamReader.createReadStream(s3Object, OpenStreamInformation.ofDefaults());
Expand Down Expand Up @@ -391,12 +404,12 @@ protected void testReadVectoredInSingleBlock(
protected void testReadVectoredForSequentialRanges(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i see that this tests range coalescing, but it would be good to add a range coalescing test as well, which will test ranges are within that 1MB distance. eg: [0-4MB, 4.3MB - 5MB, 5.9MB - 7MB].

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ahmarsuhail. Do test under IOPlanTest cover your concern. I was trying to put corner cases for actual coalesing business logic there instead of here.

@NonNull S3ClientKind s3ClientKind,
@NonNull S3Object s3Object,
@NonNull AALInputStreamConfigurationKind AALInputStreamConfigurationKind,
@NonNull AALInputStreamConfigurationKind configurationKind,
@NonNull IntFunction<ByteBuffer> allocate)
throws IOException {

try (S3AALClientStreamReader s3AALClientStreamReader =
this.getStreamReader(s3ClientKind, AALInputStreamConfigurationKind)) {
this.getStreamReader(s3ClientKind, configurationKind)) {

S3SeekableInputStream s3SeekableInputStream =
s3AALClientStreamReader.createReadStream(s3Object, OpenStreamInformation.ofDefaults());
Expand All @@ -416,12 +429,18 @@ protected void testReadVectoredForSequentialRanges(
objectRange.getByteBuffer().join();
}

int expectedRequests =
(configurationKind == AALInputStreamConfigurationKind.NO_REQUEST_COALESCING)
? objectRanges.size()
: 3;

assertEquals(
5,
expectedRequests,
s3AALClientStreamReader
.getS3SeekableInputStreamFactory()
.getMetrics()
.get(MetricKey.GET_REQUEST_COUNT));
verifyStreamContents(objectRanges, s3AALClientStreamReader, s3Object);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,13 @@ LogicalIO createLogicalIO(S3URI s3URI, OpenStreamInformation openStreamInformati
PhysicalIO createPhysicalIO(S3URI s3URI, OpenStreamInformation openStreamInformation)
throws IOException {
return new PhysicalIOImpl(
s3URI, objectMetadataStore, objectBlobStore, telemetry, openStreamInformation, threadPool);
s3URI,
objectMetadataStore,
objectBlobStore,
telemetry,
openStreamInformation,
threadPool,
configuration.getPhysicalIOConfiguration());
}

void storeObjectMetadata(S3URI s3URI, ObjectMetadata metadata) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ public class PhysicalIOConfiguration {
private static final long DEFAULT_READ_BUFFER_SIZE = 128 * ONE_KB;
private static final long DEFAULT_TARGET_REQUEST_SIZE = 8 * ONE_MB;
private static final double DEFAULT_REQUEST_TOLERANCE_RATIO = 1.4;
private static final boolean DEFAULT_COALESCE_REQUEST = true;
private static final long DEFAULT_COALESCE_REQUEST_TOLERANCE = ONE_MB;

/**
* Capacity, in blobs. {@link PhysicalIOConfiguration#DEFAULT_MEMORY_CAPACITY_BYTES} by default.
Expand Down Expand Up @@ -175,6 +177,16 @@ public class PhysicalIOConfiguration {

private static final String REQUEST_TOLERANCE_RATIO_KEY = "request.tolerance.ratio";

/** Flag to enable request Coalescing */
@Builder.Default private boolean requestCoalesce = DEFAULT_COALESCE_REQUEST;

private static final String REQUEST_COALESCE_KEY = "request.coalesce";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why add it as a config? when would we not want to do this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We may rename like request.coalesce.enabled


/** Number of bytes to read and swallow when merging two requests */
@Builder.Default private long requestCoalesceTolerance = DEFAULT_COALESCE_REQUEST_TOLERANCE;

private static final String REQUEST_COALESCE_TOLERANCE_KEY = "request.coalesce.tolerance";

/** Default set of settings for {@link PhysicalIO} */
public static final PhysicalIOConfiguration DEFAULT = PhysicalIOConfiguration.builder().build();

Expand Down Expand Up @@ -225,6 +237,10 @@ public static PhysicalIOConfiguration fromConfiguration(ConnectorConfiguration c
configuration.getLong(TARGET_REQUEST_SIZE_KEY, DEFAULT_TARGET_REQUEST_SIZE))
.requestToleranceRatio(
configuration.getDouble(REQUEST_TOLERANCE_RATIO_KEY, DEFAULT_REQUEST_TOLERANCE_RATIO))
.requestCoalesce(configuration.getBoolean(REQUEST_COALESCE_KEY, DEFAULT_COALESCE_REQUEST))
.requestCoalesceTolerance(
configuration.getLong(
REQUEST_COALESCE_TOLERANCE_KEY, DEFAULT_COALESCE_REQUEST_TOLERANCE))
.build();
}

Expand All @@ -251,6 +267,9 @@ public static PhysicalIOConfiguration fromConfiguration(ConnectorConfiguration c
* @param readBufferSize Size of the maximum buffer for read operations
* @param targetRequestSize Target S3 request size, in bytes
* @param requestToleranceRatio Request tolerance ratio
* @param requestCoalesce Flag to enable request Coalescing
* @param requestCoalesceTolerance Number of bytes to read and swallow when merging two requests
* @throws IllegalArgumentException if any of the parameters are invalid
*/
@Builder
private PhysicalIOConfiguration(
Expand All @@ -271,7 +290,9 @@ private PhysicalIOConfiguration(
int threadPoolSize,
long readBufferSize,
long targetRequestSize,
double requestToleranceRatio) {
double requestToleranceRatio,
boolean requestCoalesce,
long requestCoalesceTolerance) {
Preconditions.checkArgument(memoryCapacityBytes > 0, "`memoryCapacityBytes` must be positive");
Preconditions.checkArgument(
memoryCleanupFrequencyMilliseconds > 0,
Expand Down Expand Up @@ -319,6 +340,8 @@ private PhysicalIOConfiguration(
this.readBufferSize = readBufferSize;
this.targetRequestSize = targetRequestSize;
this.requestToleranceRatio = requestToleranceRatio;
this.requestCoalesce = requestCoalesce;
this.requestCoalesceTolerance = requestCoalesceTolerance;
}

@Override
Expand All @@ -345,7 +368,8 @@ public String toString() {
builder.append("\treadBufferSize: " + readBufferSize + "\n");
builder.append("\ttargetRequestSize: " + targetRequestSize + "\n");
builder.append("\trequestToleranceRatio: " + requestToleranceRatio + "\n");

builder.append("\trequestCoalesce: " + requestCoalesce + "\n");
builder.append("\trequestCoalesceTolerance: " + requestCoalesceTolerance + "\n");
return builder.toString();
}
}
Loading
Loading