-
Notifications
You must be signed in to change notification settings - Fork 9.2k
HADOOP-17113. Adding ReadAhead Counters in ABFS #2154
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -39,6 +39,10 @@ public class ITestAbfsInputStreamStatistics | |
| LoggerFactory.getLogger(ITestAbfsInputStreamStatistics.class); | ||
| private static final int ONE_MB = 1024 * 1024; | ||
| private static final int ONE_KB = 1024; | ||
| private static final int CUSTOM_BLOCK_BUFFER_SIZE = 4 * 1024; | ||
| private static final int CUSTOM_READ_AHEAD_BUFFER_SIZE = 8 * CUSTOM_BLOCK_BUFFER_SIZE; | ||
| private static final int THREAD_SLEEP_10_SECONDS = 10; | ||
| private static final int TIMEOUT_30_SECONDS = 30000; | ||
| private byte[] defBuffer = new byte[ONE_MB]; | ||
|
|
||
| public ITestAbfsInputStreamStatistics() throws Exception { | ||
|
|
@@ -75,6 +79,8 @@ public void testInitValues() throws IOException { | |
| checkInitValue(stats.getReadOperations(), "readOps"); | ||
| checkInitValue(stats.getBytesReadFromBuffer(), "bytesReadFromBuffer"); | ||
| checkInitValue(stats.getRemoteReadOperations(), "remoteReadOps"); | ||
| checkInitValue(stats.getReadAheadBytesRead(), "readAheadBytesRead"); | ||
| checkInitValue(stats.getRemoteBytesRead(), "readAheadRemoteBytesRead"); | ||
|
|
||
| } finally { | ||
| IOUtils.cleanupWithLogger(LOG, outputStream, inputStream); | ||
|
|
@@ -285,6 +291,98 @@ public void testWithNullStreamStatistics() throws IOException { | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * Testing readAhead counters in AbfsInputStream with 30 seconds timeout. | ||
| */ | ||
| @Test(timeout = TIMEOUT_30_SECONDS) | ||
| public void testReadAheadCounters() throws IOException { | ||
| describe("Test to check correct values for readAhead counters in " | ||
| + "AbfsInputStream"); | ||
|
|
||
| AzureBlobFileSystem fs = getFileSystem(); | ||
| AzureBlobFileSystemStore abfss = fs.getAbfsStore(); | ||
| Path readAheadCountersPath = path(getMethodName()); | ||
|
|
||
| /* | ||
| * Setting the block size for readAhead as 4KB. | ||
| */ | ||
| abfss.getAbfsConfiguration().setReadBufferSize(CUSTOM_BLOCK_BUFFER_SIZE); | ||
|
|
||
| AbfsOutputStream out = null; | ||
| AbfsInputStream in = null; | ||
|
|
||
| try { | ||
|
|
||
| /* | ||
| * Creating a file of 1MB size. | ||
| */ | ||
| out = createAbfsOutputStreamWithFlushEnabled(fs, readAheadCountersPath); | ||
| out.write(defBuffer); | ||
| out.close(); | ||
|
|
||
| in = abfss.openFileForRead(readAheadCountersPath, fs.getFsStatistics()); | ||
|
|
||
| /* | ||
| * Reading 1KB after each i * KB positions. Hence the reads are from 0 | ||
| * to 1KB, 1KB to 2KB, and so on.. for 5 operations. | ||
| */ | ||
| for (int i = 0; i < 5; i++) { | ||
| in.seek(ONE_KB * i); | ||
| in.read(defBuffer, ONE_KB * i, ONE_KB); | ||
| } | ||
| AbfsInputStreamStatisticsImpl stats = | ||
| (AbfsInputStreamStatisticsImpl) in.getStreamStatistics(); | ||
|
|
||
| /* | ||
| * Since, readAhead is done in background threads. Sometimes, the | ||
| * threads aren't finished in the background and could result in | ||
| * inaccurate results. So, we wait till we have the accurate values | ||
| * with a limit of 30 seconds as that's when the test times out. | ||
| * | ||
| */ | ||
| while (stats.getRemoteBytesRead() < CUSTOM_READ_AHEAD_BUFFER_SIZE | ||
| || stats.getReadAheadBytesRead() < CUSTOM_BLOCK_BUFFER_SIZE) { | ||
| Thread.sleep(THREAD_SLEEP_10_SECONDS); | ||
| } | ||
|
|
||
| /* | ||
| * Verifying the counter values of readAheadBytesRead and remoteBytesRead. | ||
| * | ||
| * readAheadBytesRead : Since, we read 1KBs 5 times, that means we go | ||
| * from 0 to 5KB in the file. The bufferSize is set to 4KB, and since | ||
| * we have 8 blocks of readAhead buffer. We would have 8 blocks of 4KB | ||
| * buffer. Our read is till 5KB, hence readAhead would ideally read 2 | ||
| * blocks of 4KB which is equal to 8KB. But, sometimes to get more than | ||
| * one block from readAhead buffer we might have to wait for background | ||
| * threads to fill the buffer and hence we might do remote read which | ||
| * would be faster. Therefore, readAheadBytesRead would be equal to or | ||
| * greater than 4KB. | ||
| * | ||
| * remoteBytesRead : Since, the bufferSize is set to 4KB and the number | ||
| * of blocks or readAheadQueueDepth is equal to 8. We would read 8 * 4 | ||
| * KB buffer on the first read, which is equal to 32KB. But, if we are not | ||
| * able to read some bytes that were in the buffer after doing | ||
| * readAhead, we might use remote read again. Thus, the bytes read | ||
| * remotely could also be greater than 32Kb. | ||
| * | ||
| */ | ||
| assertTrue(String.format("actual value of %d is not greater than or " | ||
|
||
| + "equal to the expected value %d for readAheadReadBytes counter", | ||
| stats.getReadAheadBytesRead(), CUSTOM_BLOCK_BUFFER_SIZE), | ||
| stats.getReadAheadBytesRead() >= CUSTOM_BLOCK_BUFFER_SIZE); | ||
|
|
||
| assertTrue(String.format("actual value of %d is not greater than or " | ||
| + "equal to the expected value %d for remoteBytesRead counter", | ||
| stats.getRemoteBytesRead(), CUSTOM_READ_AHEAD_BUFFER_SIZE), | ||
| stats.getRemoteBytesRead() >= CUSTOM_READ_AHEAD_BUFFER_SIZE); | ||
|
|
||
| } catch (InterruptedException e) { | ||
| e.printStackTrace(); | ||
|
||
| } finally { | ||
| IOUtils.cleanupWithLogger(LOG, out, in); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Method to assert the initial values of the statistics. | ||
| * | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice explanation