Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ private Runnable processReadTask(final List<Block> blocks, ReadMode readMode) {
.referrer(new Referrer(requestRange.toHttpString(), readMode))
.build();

openStreamInformation.getRequestCallback().onGetRequest();

// Fetch the object content from S3
ObjectContent objectContent = fetchObjectContent(getRequest);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import software.amazon.s3.analyticsaccelerator.util.MetricKey;
import software.amazon.s3.analyticsaccelerator.util.ObjectKey;
import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation;
import software.amazon.s3.analyticsaccelerator.util.RequestCallback;
import software.amazon.s3.analyticsaccelerator.util.S3URI;

@SuppressFBWarnings(
Expand All @@ -50,7 +51,8 @@ public class StreamReaderTest {
private ObjectKey mockObjectKey;
private ExecutorService mockExecutorService;
private Consumer<List<Block>> mockRemoveBlocksFunc;
private OpenStreamInformation mockOpenStreamInfo;
private OpenStreamInformation openStreamInfo;
private RequestCallback mockRequestCallback;
private Metrics mockMetrics;

private StreamReader streamReader;
Expand All @@ -63,7 +65,9 @@ void setUp() {
mockExecutorService = mock(ExecutorService.class);
mockRemoveBlocksFunc = mock(Consumer.class);
mockMetrics = mock(Metrics.class);
mockOpenStreamInfo = mock(OpenStreamInformation.class);
mockRequestCallback = mock(RequestCallback.class);

openStreamInfo = OpenStreamInformation.builder().requestCallback(mockRequestCallback).build();

streamReader =
new StreamReader(
Expand All @@ -72,7 +76,7 @@ void setUp() {
mockExecutorService,
mockRemoveBlocksFunc,
mockMetrics,
mockOpenStreamInfo);
openStreamInfo);
}

@Test
Expand All @@ -86,7 +90,7 @@ void test_initializeExceptions() {
mockExecutorService,
mockRemoveBlocksFunc,
mockMetrics,
mockOpenStreamInfo));
openStreamInfo));

assertThrows(
NullPointerException.class,
Expand All @@ -97,7 +101,7 @@ void test_initializeExceptions() {
mockExecutorService,
mockRemoveBlocksFunc,
mockMetrics,
mockOpenStreamInfo));
openStreamInfo));

assertThrows(
NullPointerException.class,
Expand All @@ -108,7 +112,7 @@ void test_initializeExceptions() {
null,
mockRemoveBlocksFunc,
mockMetrics,
mockOpenStreamInfo));
openStreamInfo));

assertThrows(
NullPointerException.class,
Expand All @@ -119,7 +123,7 @@ void test_initializeExceptions() {
mockExecutorService,
null,
mockMetrics,
mockOpenStreamInfo));
openStreamInfo));

assertThrows(
NullPointerException.class,
Expand All @@ -130,7 +134,7 @@ void test_initializeExceptions() {
mockExecutorService,
mockRemoveBlocksFunc,
null,
mockOpenStreamInfo));
openStreamInfo));

assertThrows(
NullPointerException.class,
Expand Down Expand Up @@ -181,26 +185,27 @@ void processReadTask_successfulRead_populatesBlocks() {

ObjectContent mockContent = mock(ObjectContent.class);
when(mockContent.getStream()).thenReturn(testStream);
when(mockObjectClient.getObject(any(GetRequest.class), eq(mockOpenStreamInfo)))
when(mockObjectClient.getObject(any(GetRequest.class), eq(openStreamInfo)))
.thenReturn(completedFuture(mockContent));

Runnable readTask = invokeProcessReadTask(blocks, ReadMode.SYNC);
readTask.run();

verify(mockRemoveBlocksFunc, never()).accept(any());
verify(mockObjectClient).getObject(any(GetRequest.class), eq(mockOpenStreamInfo));
verify(mockObjectClient).getObject(any(GetRequest.class), eq(openStreamInfo));
verifyNoMoreInteractions(mockObjectClient);
verify(mockMetrics).add(MetricKey.GET_REQUEST_COUNT, 1);
verifyNoMoreInteractions(mockMetrics);
verify(block).setData(testData);
verify(mockRequestCallback, times(1)).onGetRequest();
}

@Test
void processReadTask_fetchObjectContentFails_callsRemoveBlocks() {
Block block = createMockBlock(0, 4);
List<Block> blocks = Collections.singletonList(block);

when(mockObjectClient.getObject(any(GetRequest.class), eq(mockOpenStreamInfo)))
when(mockObjectClient.getObject(any(GetRequest.class), eq(openStreamInfo)))
.thenThrow(new RuntimeException("fail"));

Runnable readTask = invokeProcessReadTask(blocks, ReadMode.SYNC);
Expand All @@ -220,7 +225,7 @@ void processReadTask_readBlocksFromStreamThrowsEOFException_callsRemoveBlocks()

ObjectContent mockContent = mock(ObjectContent.class);
when(mockContent.getStream()).thenReturn(throwingStream);
when(mockObjectClient.getObject(any(GetRequest.class), eq(mockOpenStreamInfo)))
when(mockObjectClient.getObject(any(GetRequest.class), eq(openStreamInfo)))
.thenReturn(completedFuture(mockContent));

Runnable readTask = invokeProcessReadTask(blocks, ReadMode.SYNC);
Expand All @@ -240,7 +245,7 @@ void processReadTask_readBlocksFromStreamThrowsIOException_callsRemoveBlocks()

ObjectContent mockContent = mock(ObjectContent.class);
when(mockContent.getStream()).thenReturn(throwingStream);
when(mockObjectClient.getObject(any(GetRequest.class), eq(mockOpenStreamInfo)))
when(mockObjectClient.getObject(any(GetRequest.class), eq(openStreamInfo)))
.thenReturn(completedFuture(mockContent));

Runnable readTask = invokeProcessReadTask(blocks, ReadMode.SYNC);
Expand All @@ -260,7 +265,7 @@ void processReadTask_multipleBlocks_readsAllSuccessfully() throws Exception {

ObjectContent mockContent = mock(ObjectContent.class);
when(mockContent.getStream()).thenReturn(testStream);
when(mockObjectClient.getObject(any(GetRequest.class), eq(mockOpenStreamInfo)))
when(mockObjectClient.getObject(any(GetRequest.class), eq(openStreamInfo)))
.thenReturn(completedFuture(mockContent));

Runnable readTask = invokeProcessReadTask(blocks, ReadMode.SYNC);
Expand All @@ -282,7 +287,7 @@ void processReadTask_blocksWithGaps_skipsCorrectly() throws Exception {

ObjectContent mockContent = mock(ObjectContent.class);
when(mockContent.getStream()).thenReturn(testStream);
when(mockObjectClient.getObject(any(GetRequest.class), eq(mockOpenStreamInfo)))
when(mockObjectClient.getObject(any(GetRequest.class), eq(openStreamInfo)))
.thenReturn(completedFuture(mockContent));

Runnable readTask = invokeProcessReadTask(blocks, ReadMode.SYNC);
Expand All @@ -303,7 +308,7 @@ void processReadTask_streamTooShort_callsRemoveBlocks() throws Exception {

ObjectContent mockContent = mock(ObjectContent.class);
when(mockContent.getStream()).thenReturn(testStream);
when(mockObjectClient.getObject(any(GetRequest.class), eq(mockOpenStreamInfo)))
when(mockObjectClient.getObject(any(GetRequest.class), eq(openStreamInfo)))
.thenReturn(completedFuture(mockContent));

Runnable readTask = invokeProcessReadTask(blocks, ReadMode.SYNC);
Expand All @@ -322,7 +327,7 @@ void processReadTask_skipFailsDueToEOF_callsRemoveBlocks() throws Exception {

ObjectContent mockContent = mock(ObjectContent.class);
when(mockContent.getStream()).thenReturn(testStream);
when(mockObjectClient.getObject(any(GetRequest.class), eq(mockOpenStreamInfo)))
when(mockObjectClient.getObject(any(GetRequest.class), eq(openStreamInfo)))
.thenReturn(completedFuture(mockContent));

Runnable readTask = invokeProcessReadTask(blocks, ReadMode.SYNC);
Expand All @@ -341,7 +346,7 @@ void processReadTask_tracksMetrics() throws Exception {

ObjectContent mockContent = mock(ObjectContent.class);
when(mockContent.getStream()).thenReturn(testStream);
when(mockObjectClient.getObject(any(GetRequest.class), eq(mockOpenStreamInfo)))
when(mockObjectClient.getObject(any(GetRequest.class), eq(openStreamInfo)))
.thenReturn(completedFuture(mockContent));

Runnable readTask = invokeProcessReadTask(blocks, ReadMode.SYNC);
Expand All @@ -360,7 +365,7 @@ void processReadTask_asyncReadMode_buildsCorrectRequest() throws Exception {

ObjectContent mockContent = mock(ObjectContent.class);
when(mockContent.getStream()).thenReturn(testStream);
when(mockObjectClient.getObject(any(GetRequest.class), eq(mockOpenStreamInfo)))
when(mockObjectClient.getObject(any(GetRequest.class), eq(openStreamInfo)))
.thenReturn(completedFuture(mockContent));

Runnable readTask = invokeProcessReadTask(blocks, ReadMode.ASYNC);
Expand All @@ -373,7 +378,7 @@ void processReadTask_asyncReadMode_buildsCorrectRequest() throws Exception {
Referrer referrer = request.getReferrer();
return referrer.getReadMode() == ReadMode.ASYNC;
}),
eq(mockOpenStreamInfo));
eq(openStreamInfo));
}

@Test
Expand All @@ -386,7 +391,7 @@ void processReadTask_removeNonFilledBlocksFromStore_filtersCorrectly() {
List<Block> blocks = Arrays.asList(filledBlock, unfilledBlock);

// Simulate failure scenario
when(mockObjectClient.getObject(any(GetRequest.class), eq(mockOpenStreamInfo)))
when(mockObjectClient.getObject(any(GetRequest.class), eq(openStreamInfo)))
.thenReturn(completedFuture(null));

Runnable readTask = invokeProcessReadTask(blocks, ReadMode.SYNC);
Expand Down