Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
Expand Down Expand Up @@ -74,17 +75,14 @@ public void testPurgeBufferManagerForParallelStreams() throws Exception {
}
} finally {
executorService.shutdown();
// wait for all tasks to finish
executorService.awaitTermination(1, TimeUnit.MINUTES);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

executorService.shutdown does an orderly shutdown of the task. It does not wait for the tasks to be completed. So, the main thread after executing line 76 will go to line 79, and assertions will happen where the execution of tasks may or may not have got completed.

Requesting you to kindly change to https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html#awaitTermination method which will wait for the tasks to be completed. Thanks.

Ref: https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html#shutdown()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aah. yes, will do that. we still need to cut the assertions about completed list and buffers returned, as any in-progress read will finish and add the buffer which will then stay until the threshold timeout


ReadBufferManager bufferManager = ReadBufferManager.getBufferManager();
assertListEmpty("CompletedList", bufferManager.getCompletedReadListCopy());
// verify there is no work in progress or the readahead queue.
assertListEmpty("InProgressList", bufferManager.getInProgressCopiedList());
assertListEmpty("ReadAheadQueue", bufferManager.getReadAheadQueueCopy());
Assertions.assertThat(bufferManager.getFreeListCopy())
.describedAs("After closing all streams free list contents should match with " + freeList)
.hasSize(numBuffers)
.containsExactlyInAnyOrderElementsOf(freeList);

}

private void assertListEmpty(String listName, List<ReadBuffer> list) {
Expand Down Expand Up @@ -116,22 +114,18 @@ public void testPurgeBufferManagerForSequentialStream() throws Exception {
try {
iStream2 = (AbfsInputStream) fs.open(testFilePath).getWrappedStream();
iStream2.read();
// After closing stream1, none of the buffers associated with stream1 should be present.
assertListDoesnotContainBuffersForIstream(bufferManager.getInProgressCopiedList(), iStream1);
assertListDoesnotContainBuffersForIstream(bufferManager.getCompletedReadListCopy(), iStream1);
// After closing stream1, no queued buffers of stream1 should be present
// assertions can't be made about the state of the other lists as it is
// too prone to race conditions.
assertListDoesnotContainBuffersForIstream(bufferManager.getReadAheadQueueCopy(), iStream1);
} finally {
// closing the stream later.
IOUtils.closeStream(iStream2);
}
// After closing stream2, none of the buffers associated with stream2 should be present.
assertListDoesnotContainBuffersForIstream(bufferManager.getInProgressCopiedList(), iStream2);
assertListDoesnotContainBuffersForIstream(bufferManager.getCompletedReadListCopy(), iStream2);
// After closing stream2, no queued buffers of stream2 should be present.
assertListDoesnotContainBuffersForIstream(bufferManager.getReadAheadQueueCopy(), iStream2);

// After closing both the streams, all lists should be empty.
assertListEmpty("CompletedList", bufferManager.getCompletedReadListCopy());
assertListEmpty("InProgressList", bufferManager.getInProgressCopiedList());
// After closing both the streams, read queue should be empty.
assertListEmpty("ReadAheadQueue", bufferManager.getReadAheadQueueCopy());

}
Expand Down