Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,35 @@
*/
package software.amazon.s3.analyticsaccelerator.request;

import lombok.AllArgsConstructor;

/**
* Enum to help with the annotation of reads. We mark reads SYNC when they were triggered by a
* synchronous read or ASYNC when they were to do logical or physical prefetching.
*/
@AllArgsConstructor
public enum ReadMode {
SYNC,
ASYNC,
SMALL_OBJECT_PREFETCH;
SYNC(true),
ASYNC(true),
SMALL_OBJECT_PREFETCH(true),
SEQUENTIAL_FILE_PREFETCH(true),
DICTIONARY_PREFETCH(false),
COLUMN_PREFETCH(false),
REMAINING_COLUMN_PREFETCH(false),
PREFETCH_TAIL(false),
READ_VECTORED(false);

private final boolean allowRequestExtension;

/**
* Should requests be extended for this read mode?
*
* <p>When the read is from the parquet prefetcher or readVectored(), we know the exact ranges we
* want to read, so in this case don't extend the ranges.
*
* @return true if requests should be extended
*/
public boolean allowRequestExtension() {
return allowRequestExtension;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,13 @@ public enum MetricKey {
/**
* Tracks the number of cache misses. Incremented when requested block is not found in the cache
*/
CACHE_MISS("CacheMiss");
CACHE_MISS("CacheMiss"),

/** Counts number of GET requests made. */
GET_REQUEST_COUNT("GetRequestCount"),

/** Counts number of HEAD requests made. */
HEAD_REQUEST_COUNT("HeadRequestCount");

/** The string name representation of the metric. */
private final String name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,18 @@ public void testMetricKeyNames() {
assertEquals("MemoryUsage", MetricKey.MEMORY_USAGE.getName());
assertEquals("CacheHit", MetricKey.CACHE_HIT.getName());
assertEquals("CacheMiss", MetricKey.CACHE_MISS.getName());
assertEquals("GetRequestCount", MetricKey.GET_REQUEST_COUNT.getName());
assertEquals("HeadRequestCount", MetricKey.HEAD_REQUEST_COUNT.getName());
}

@Test
public void testEnumValues() {
MetricKey[] values = MetricKey.values();
assertEquals(3, values.length);
assertEquals(5, values.length);
assertEquals(MetricKey.MEMORY_USAGE, values[0]);
assertEquals(MetricKey.CACHE_HIT, values[1]);
assertEquals(MetricKey.CACHE_MISS, values[2]);
assertEquals(MetricKey.GET_REQUEST_COUNT, values[3]);
assertEquals(MetricKey.HEAD_REQUEST_COUNT, values[4]);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,10 @@

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.security.SecureRandom;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.IntFunction;
import java.util.stream.Stream;
import lombok.NonNull;
import org.junit.jupiter.api.AfterEach;
Expand All @@ -45,7 +43,6 @@
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream;
import software.amazon.s3.analyticsaccelerator.common.ObjectRange;
import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation;
import software.amazon.s3.analyticsaccelerator.util.S3URI;

Expand Down Expand Up @@ -187,58 +184,6 @@ protected void testChangingEtagMidStream(
}
}

/**
* This test verifies that the data in the buffers is the same when a file is read through
* readVectored() vs stream.read(buf[], off, len).
*
* @param s3ClientKind S3 client kind to use
* @param s3Object S3 object to read
* @param streamReadPatternKind stream read pattern to apply
* @param AALInputStreamConfigurationKind configuration kind
* @param allocate method to allocate the buffer, can be direct or non-direct
* @throws IOException on any IOException
*/
protected void testReadVectored(
@NonNull S3ClientKind s3ClientKind,
@NonNull S3Object s3Object,
@NonNull StreamReadPatternKind streamReadPatternKind,
@NonNull AALInputStreamConfigurationKind AALInputStreamConfigurationKind,
@NonNull IntFunction<ByteBuffer> allocate)
throws IOException {

try (S3AALClientStreamReader s3AALClientStreamReader =
this.createS3AALClientStreamReader(s3ClientKind, AALInputStreamConfigurationKind)) {

S3SeekableInputStream s3SeekableInputStream =
s3AALClientStreamReader.createReadStream(s3Object, OpenStreamInformation.DEFAULT);

List<ObjectRange> objectRanges = new ArrayList<>();
objectRanges.add(new ObjectRange(new CompletableFuture<>(), 50, 500));
objectRanges.add(new ObjectRange(new CompletableFuture<>(), 1000, 800));
objectRanges.add(new ObjectRange(new CompletableFuture<>(), 4000, 5000));

s3SeekableInputStream.readVectored(
objectRanges,
allocate,
(buffer) -> {
LOG.debug("Release buffer of length {}: {}", buffer.limit(), buffer);
});

for (ObjectRange objectRange : objectRanges) {
ByteBuffer byteBuffer = objectRange.getByteBuffer().join();

S3SeekableInputStream verificationStream =
s3AALClientStreamReader.createReadStream(s3Object, OpenStreamInformation.DEFAULT);
verificationStream.seek(objectRange.getOffset());
byte[] buffer = new byte[objectRange.getLength()];
int readBytes = verificationStream.read(buffer, 0, buffer.length);

assertEquals(readBytes, buffer.length);
verifyBufferContentsEqual(byteBuffer, buffer);
}
}
}

/**
* Used to read and assert helps when we want to run it in a lambda.
*
Expand All @@ -254,18 +199,6 @@ private void readAndAssert(S3SeekableInputStream stream, byte[] buffer, int offs
assertEquals(readBytes, len);
}

/**
* Verify the contents of two buffers are equal
*
* @param buffer ByteBuffer to verify contents for
* @param expected expected contents in byte buffer
*/
private void verifyBufferContentsEqual(ByteBuffer buffer, byte[] expected) {
for (int i = 0; i < expected.length; i++) {
assertEquals(buffer.get(i), expected[i]);
}
}

/**
* Tests to make sure if we have read our whole object we pass and return our cached data even if
* the etag has changed after the read is complete
Expand Down
Loading
Loading