-
Notifications
You must be signed in to change notification settings - Fork 9.2k
HADOOP-18546. ABFS:disable purging list of in progress reads in abfs stream closed #5176
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 8 commits
54e706a
39c3403
8cad3d5
a8eb44b
11ff043
ddefd2e
6a1e0a6
02d39ca
2a6dcb6
fc833f2
11dd7bf
ac1e758
64546a5
4902598
c76f610
455a472
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,11 +20,13 @@ | |
|
|
||
| import java.io.IOException; | ||
| import java.util.Arrays; | ||
| import java.util.List; | ||
| import java.util.Optional; | ||
| import java.util.Random; | ||
| import java.util.concurrent.ExecutionException; | ||
|
|
||
| import org.assertj.core.api.Assertions; | ||
| import org.junit.After; | ||
| import org.junit.Assert; | ||
| import org.junit.Test; | ||
| import org.mockito.Mockito; | ||
|
|
@@ -82,6 +84,16 @@ public class TestAbfsInputStream extends | |
| REDUCED_READ_BUFFER_AGE_THRESHOLD * 10; // 30 sec | ||
| private static final int ALWAYS_READ_BUFFER_SIZE_TEST_FILE_SIZE = 16 * ONE_MB; | ||
|
|
||
| @After | ||
| public void afterTest() throws InterruptedException { | ||
|
||
| //thread wait so that previous test's inProgress buffers are processed and removed. | ||
| Thread.sleep(10000l); | ||
|
||
| ReadBufferManager readBufferManager = ReadBufferManager.getBufferManager(); | ||
| while(readBufferManager.getCompletedReadListSize() > 0) { | ||
| readBufferManager.callTryEvict(); | ||
| } | ||
| } | ||
|
|
||
| private AbfsRestOperation getMockRestOp() { | ||
| AbfsRestOperation op = mock(AbfsRestOperation.class); | ||
| AbfsHttpOperation httpOp = mock(AbfsHttpOperation.class); | ||
|
|
@@ -106,7 +118,6 @@ private AbfsClient getMockAbfsClient() { | |
| private AbfsInputStream getAbfsInputStream(AbfsClient mockAbfsClient, | ||
| String fileName) throws IOException { | ||
| AbfsInputStreamContext inputStreamContext = new AbfsInputStreamContext(-1); | ||
| inputStreamContext.isReadAheadEnabled(true); | ||
| // Create AbfsInputStream with the client instance | ||
| AbfsInputStream inputStream = new AbfsInputStream( | ||
| mockAbfsClient, | ||
|
|
@@ -132,7 +143,6 @@ public AbfsInputStream getAbfsInputStream(AbfsClient abfsClient, | |
| boolean alwaysReadBufferSize, | ||
| int readAheadBlockSize) throws IOException { | ||
| AbfsInputStreamContext inputStreamContext = new AbfsInputStreamContext(-1); | ||
| inputStreamContext.isReadAheadEnabled(true); | ||
| // Create AbfsInputStream with the client instance | ||
| AbfsInputStream inputStream = new AbfsInputStream( | ||
| abfsClient, | ||
|
|
@@ -495,6 +505,105 @@ public void testSuccessfulReadAhead() throws Exception { | |
| checkEvictedStatus(inputStream, 0, true); | ||
| } | ||
|
|
||
| /** | ||
| * This test expects InProgressList is not purged by the inputStream close. | ||
| * The readBuffer will move to completedList and then finally should get evicted. | ||
| */ | ||
| @Test | ||
| public void testStreamPurgeDuringReadAheadCallExecuting() throws Exception { | ||
| AbfsClient client = getMockAbfsClient(); | ||
| AbfsRestOperation successOp = getMockRestOp(); | ||
|
|
||
| Mockito.doAnswer(invocationOnMock -> { | ||
| //sleeping thread to mock the network latency from client to backend. | ||
| Thread.sleep(3000l); | ||
| return successOp; | ||
| }) | ||
| .when(client) | ||
|
||
| .read(any(String.class), any(Long.class), any(byte[].class), | ||
| any(Integer.class), any(Integer.class), any(String.class), | ||
| any(String.class), any(TracingContext.class)); | ||
|
|
||
| AbfsInputStream inputStream = getAbfsInputStream(client, | ||
| "testSuccessfulReadAhead.txt"); | ||
| queueReadAheads(inputStream); | ||
|
|
||
| final ReadBufferManager readBufferManager | ||
| = ReadBufferManager.getBufferManager(); | ||
|
|
||
| //Sleeping to give ReadBufferWorker to pick the readBuffers for processing. | ||
| Thread.sleep(1000l); | ||
|
||
|
|
||
| Assertions.assertThat( | ||
| getStreamRelatedBufferCount(readBufferManager.getInProgressCopiedList(), | ||
| inputStream)) | ||
| .describedAs("InProgressList should have 3 elements") | ||
| .isEqualTo(3); | ||
| Assertions.assertThat(readBufferManager.getFreeListCopy().size()) | ||
|
||
| .describedAs("FreeList should have 13 elements") | ||
| .isEqualTo(13); | ||
| Assertions.assertThat(readBufferManager.getCompletedReadListCopy().size()) | ||
| .describedAs("CompletedList should have 0 elements") | ||
| .isEqualTo(0); | ||
|
|
||
| inputStream.close(); | ||
|
||
|
|
||
| Assertions.assertThat( | ||
| getStreamRelatedBufferCount(readBufferManager.getInProgressCopiedList(), | ||
| inputStream)) | ||
| .describedAs("InProgressList should have 3 elements") | ||
| .isEqualTo(3); | ||
| Assertions.assertThat(getStreamRelatedBufferCount( | ||
| readBufferManager.getCompletedReadListCopy(), inputStream)) | ||
| .describedAs("CompletedList should have 0 elements") | ||
| .isEqualTo(0); | ||
| Assertions.assertThat(readBufferManager.getFreeListCopy().size()) | ||
|
||
| .describedAs("FreeList should have 13 elements") | ||
| .isEqualTo(13); | ||
|
|
||
| //Sleep so that response from mockedClient gets back to ReadBufferWorker and | ||
| // can populate into completedList. | ||
| Thread.sleep(3000l); | ||
|
|
||
| Assertions.assertThat(getStreamRelatedBufferCount( | ||
| readBufferManager.getCompletedReadListCopy(), inputStream)) | ||
| .describedAs("CompletedList should have 3 elements") | ||
| .isEqualTo(3); | ||
| Assertions.assertThat(readBufferManager.getFreeListCopy().size()) | ||
| .describedAs("FreeList should have 13 elements") | ||
| .isEqualTo(13); | ||
| Assertions.assertThat( | ||
| getStreamRelatedBufferCount(readBufferManager.getInProgressCopiedList(), | ||
| inputStream)) | ||
| .describedAs("InProgressList should have 0 elements") | ||
| .isEqualTo(0); | ||
|
|
||
| Thread.sleep(readBufferManager.getThresholdAgeMilliseconds()); | ||
|
|
||
| readBufferManager.callTryEvict(); | ||
| readBufferManager.callTryEvict(); | ||
| readBufferManager.callTryEvict(); | ||
|
|
||
| Assertions.assertThat(getStreamRelatedBufferCount( | ||
| readBufferManager.getCompletedReadListCopy(), inputStream)) | ||
| .describedAs("CompletedList should have 0 elements") | ||
| .isEqualTo(0); | ||
| Assertions.assertThat(readBufferManager.getFreeListCopy().size()) | ||
|
||
| .describedAs("FreeList should have 16 elements") | ||
| .isEqualTo(16); | ||
| } | ||
|
|
||
| private int getStreamRelatedBufferCount(final List<ReadBuffer> bufferList, | ||
| final AbfsInputStream inputStream) { | ||
| int count = 0; | ||
|
||
| for (ReadBuffer buffer : bufferList) { | ||
| if (buffer.getStream() == inputStream) { | ||
| count++; | ||
| } | ||
| } | ||
| return count; | ||
| } | ||
|
|
||
| /** | ||
| * This test expects ReadAheadManager to throw exception if the read ahead | ||
| * thread had failed within the last thresholdAgeMilliseconds. | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.