From 152dca05203c677e33bc31611199b869e77d2150 Mon Sep 17 00:00:00 2001 From: Mukund Thakur Date: Tue, 10 Aug 2021 14:42:49 +0530 Subject: [PATCH 1/4] HADOOP-17156. Purging the buffers associated with input streams during close() --- .../fs/azurebfs/services/AbfsInputStream.java | 3 +- .../azurebfs/services/ReadBufferManager.java | 88 ++++++++++--- .../services/ITestReadBufferManager.java | 119 ++++++++++++++++++ 3 files changed, 193 insertions(+), 17 deletions(-) create mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index c98819cadc489..7033ae9a4a039 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -693,9 +693,10 @@ public boolean seekToNewSource(long l) throws IOException { @Override public synchronized void close() throws IOException { + LOG.debug("Closing {}", this); closed = true; buffer = null; // de-reference the buffer so it can be GC'ed sooner - LOG.debug("Closing {}", this); + ReadBufferManager.getBufferManager().purgeBuffersForStream(this); } /** diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java index db4ec941c5e30..19c5d2a23152e 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java @@ -22,11 +22,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.LinkedList; -import java.util.Queue; -import java.util.Stack; +import java.util.*; import java.util.concurrent.CountDownLatch; import java.util.concurrent.locks.ReentrantLock; @@ -456,18 +452,23 @@ void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, final i buffer.getStream().getPath(), buffer.getOffset(), result, bytesActuallyRead); } synchronized (this) { - inProgressList.remove(buffer); - if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) { - buffer.setStatus(ReadBufferStatus.AVAILABLE); - buffer.setLength(bytesActuallyRead); - } else { - freeList.push(buffer.getBufferindex()); - // buffer will be deleted as per the eviction policy. + // If this buffer has already been purged during + // close of InputStream then we don't update the lists. + if (inProgressList.contains(buffer)) { + inProgressList.remove(buffer); + if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) { + buffer.setStatus(ReadBufferStatus.AVAILABLE); + buffer.setLength(bytesActuallyRead); + } else { + freeList.push(buffer.getBufferindex()); + // buffer will be deleted as per the eviction policy. + } + // completed list also contains FAILED read buffers + // for sending exception message to clients. + buffer.setStatus(result); + buffer.setTimeStamp(currentTimeMillis()); + completedReadList.add(buffer); } - - buffer.setStatus(result); - buffer.setTimeStamp(currentTimeMillis()); - completedReadList.add(buffer); } //outside the synchronized, since anyone receiving a wake-up from the latch must see safe-published results @@ -502,11 +503,66 @@ int getCompletedReadListSize() { return completedReadList.size(); } + @VisibleForTesting + public LinkedList getCompletedReadListCopy() { + return new LinkedList<>(completedReadList); + } + + @VisibleForTesting + public LinkedList getFreeListCopy() { + return new LinkedList<>(freeList); + } + + @VisibleForTesting + public LinkedList getReadAheadQueueCopy() { + return new LinkedList<>(readAheadQueue); + } + + @VisibleForTesting + public LinkedList getInProgressCopiedList() { + return new LinkedList<>(inProgressList); + } + @VisibleForTesting void callTryEvict() { tryEvict(); } + + /** + * Purging the buffers associated with an {@link AbfsInputStream} + * from {@link ReadBufferManager} when stream is closed. + * @param stream input stream. + */ + public synchronized void purgeBuffersForStream(AbfsInputStream stream) { + LOGGER.debug("Purging stale buffers for AbfsInputStream {} ", stream); + readAheadQueue.removeIf(readBuffer -> readBuffer.getStream() == stream); + purgeList(stream, completedReadList); + purgeList(stream, inProgressList); + } + + /** + * Method to remove buffers associated with a {@link AbfsInputStream} + * when its close method is called. + * As failed ReadBuffers (bufferIndex = -1) are already pushed to free + * list in {@link this#doneReading(ReadBuffer, ReadBufferStatus, int)}, + * we will skip adding those here again. + * @param stream associated input stream. + * @param list list of buffers like {@link this#completedReadList} + * or {@link this#inProgressList}. + */ + private void purgeList(AbfsInputStream stream, LinkedList list) { + for (Iterator it = list.iterator(); it.hasNext();) { + ReadBuffer readBuffer = it.next(); + if (readBuffer.getStream() == stream) { + it.remove(); + if (readBuffer.getBufferindex() != -1) { + freeList.push(readBuffer.getBufferindex()); + } + } + } + } + /** * Test method that can clean up the current state of readAhead buffers and * the lists. Will also trigger a fresh init. diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java new file mode 100644 index 0000000000000..b8f1a1027e779 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java @@ -0,0 +1,119 @@ +package org.apache.hadoop.fs.azurebfs.services; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest; +import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; +import org.assertj.core.api.Assertions; +import org.junit.Test; + +import java.io.IOException; +import java.util.*; + +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.*; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.*; + +public class ITestReadBufferManager extends AbstractAbfsIntegrationTest { + + public ITestReadBufferManager() throws Exception { + } + + @Test + public void testPurgeBufferManagerForParallelStreams() throws Exception { + describe("Testing purging of buffers from ReadBufferManager for " + + "parallel input streams"); + final int numBuffers = 16; + final LinkedList freeList = new LinkedList<>(); + for (int i=0; i < numBuffers; i++) { + freeList.add(i); + } + AzureBlobFileSystem fs = getABFSWithReadAheadConfig(); + for(int i=0; i < 4; i++) { + String fileName = methodName.getMethodName() + i; + byte[] fileContent = getRandomBytesArray(ONE_MB); + Path testFilePath = createFileWithContent(fs, fileName, fileContent); + try (FSDataInputStream iStream = fs.open(testFilePath)) { + iStream.read(); + } + } + ReadBufferManager bufferManager = ReadBufferManager.getBufferManager(); + Assertions.assertThat(bufferManager.getCompletedReadListCopy().size()) + .describedAs("After closing all streams completed list size should be 0") + .isEqualTo(0); + + Assertions.assertThat(bufferManager.getInProgressCopiedList().size()) + .describedAs("After closing all streams inProgress list size should be 0") + .isEqualTo(0); + Assertions.assertThat(bufferManager.getFreeListCopy()) + .describedAs("After closing all streams free list contents should match with " + freeList) + .hasSize(numBuffers) + .containsExactlyInAnyOrderElementsOf(freeList); + Assertions.assertThat(bufferManager.getReadAheadQueueCopy()) + .describedAs("After closing all stream ReadAheadQueue should be empty") + .hasSize(0); + + } + + @Test + public void testPurgeBufferManagerForSequentialStream() throws Exception { + describe("Testing purging of buffers in ReadBufferManager for " + + "sequential input streams"); + AzureBlobFileSystem fs = getABFSWithReadAheadConfig(); + final String fileName = methodName.getMethodName(); + byte[] fileContent = getRandomBytesArray(ONE_MB); + Path testFilePath = createFileWithContent(fs, fileName, fileContent); + AbfsInputStream iStream1 = (AbfsInputStream) fs.open(testFilePath).getWrappedStream(); + iStream1.read(); + // closing the stream right away. + iStream1.close(); + AbfsInputStream iStream2 = (AbfsInputStream) fs.open(testFilePath).getWrappedStream(); + iStream2.read(); + ReadBufferManager bufferManager = ReadBufferManager.getBufferManager(); + assertListDoesnotContainBuffersForIstream(bufferManager.getInProgressCopiedList(), iStream1); + assertListDoesnotContainBuffersForIstream(bufferManager.getCompletedReadListCopy(), iStream1); + assertListDoesnotContainBuffersForIstream(bufferManager.getReadAheadQueueCopy(), iStream1); + // closing the stream later. + iStream2.close(); + assertListDoesnotContainBuffersForIstream(bufferManager.getInProgressCopiedList(), iStream2); + assertListDoesnotContainBuffersForIstream(bufferManager.getCompletedReadListCopy(), iStream2); + assertListDoesnotContainBuffersForIstream(bufferManager.getReadAheadQueueCopy(), iStream2); + + } + + + private void assertListDoesnotContainBuffersForIstream(LinkedList list, + AbfsInputStream inputStream) { + for (ReadBuffer buffer : list) { + Assertions.assertThat(buffer.getStream()) + .describedAs("Buffers associated with closed input streams shouldn't be present") + .isNotEqualTo(inputStream); + } + } + + private AzureBlobFileSystem getABFSWithReadAheadConfig() throws Exception { + Configuration conf = getRawConfiguration(); + conf.setLong(FS_AZURE_READ_AHEAD_QUEUE_DEPTH, 8); + conf.setInt(AZURE_READ_BUFFER_SIZE, MIN_BUFFER_SIZE); + conf.setInt(FS_AZURE_READ_AHEAD_BLOCK_SIZE, MIN_BUFFER_SIZE); + return getFileSystem(conf); + } + + protected byte[] getRandomBytesArray(int length) { + final byte[] b = new byte[length]; + new Random().nextBytes(b); + return b; + } + + protected Path createFileWithContent(FileSystem fs, String fileName, + byte[] fileContent) throws IOException { + Path testFilePath = path(fileName); + try (FSDataOutputStream oStream = fs.create(testFilePath)) { + oStream.write(fileContent); + oStream.flush(); + } + return testFilePath; + } +} From 21bcb9819c83578f1c1e95233bccb336a9afbef5 Mon Sep 17 00:00:00 2001 From: Mukund Thakur Date: Wed, 11 Aug 2021 17:11:22 +0530 Subject: [PATCH 2/4] HADOOP-17156. Review comments --- .../azurebfs/services/ReadBufferManager.java | 31 +++-- .../services/ITestReadBufferManager.java | 114 +++++++++++++----- 2 files changed, 102 insertions(+), 43 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java index 19c5d2a23152e..456ca077cacc4 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java @@ -22,7 +22,13 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; +import java.util.Stack; import java.util.concurrent.CountDownLatch; import java.util.concurrent.locks.ReentrantLock; @@ -504,23 +510,23 @@ int getCompletedReadListSize() { } @VisibleForTesting - public LinkedList getCompletedReadListCopy() { - return new LinkedList<>(completedReadList); + public synchronized List getCompletedReadListCopy() { + return new ArrayList<>(completedReadList); } @VisibleForTesting - public LinkedList getFreeListCopy() { - return new LinkedList<>(freeList); + public synchronized List getFreeListCopy() { + return new ArrayList<>(freeList); } @VisibleForTesting - public LinkedList getReadAheadQueueCopy() { - return new LinkedList<>(readAheadQueue); + public synchronized List getReadAheadQueueCopy() { + return new ArrayList<>(readAheadQueue); } @VisibleForTesting - public LinkedList getInProgressCopiedList() { - return new LinkedList<>(inProgressList); + public synchronized List getInProgressCopiedList() { + return new ArrayList<>(inProgressList); } @VisibleForTesting @@ -544,9 +550,8 @@ public synchronized void purgeBuffersForStream(AbfsInputStream stream) { /** * Method to remove buffers associated with a {@link AbfsInputStream} * when its close method is called. - * As failed ReadBuffers (bufferIndex = -1) are already pushed to free - * list in {@link this#doneReading(ReadBuffer, ReadBufferStatus, int)}, - * we will skip adding those here again. + * NOTE: This method is not threadsafe and must be called inside a + * synchronised block. See caller. * @param stream associated input stream. * @param list list of buffers like {@link this#completedReadList} * or {@link this#inProgressList}. @@ -556,6 +561,8 @@ private void purgeList(AbfsInputStream stream, LinkedList list) { ReadBuffer readBuffer = it.next(); if (readBuffer.getStream() == stream) { it.remove(); + // As failed ReadBuffers (bufferIndex = -1) are already pushed to free + // list in doneReading method, we will skip adding those here again. if (readBuffer.getBufferindex() != -1) { freeList.push(readBuffer.getBufferindex()); } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java index b8f1a1027e779..1deca9dc949f9 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.fs.azurebfs.services; import org.apache.hadoop.conf.Configuration; @@ -7,14 +25,23 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest; import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; +import org.apache.hadoop.io.IOUtils; import org.assertj.core.api.Assertions; import org.junit.Test; import java.io.IOException; -import java.util.*; +import java.util.LinkedList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; -import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.*; -import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.*; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_READ_BUFFER_SIZE; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READ_AHEAD_BLOCK_SIZE; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READ_AHEAD_QUEUE_DEPTH; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB; public class ITestReadBufferManager extends AbstractAbfsIntegrationTest { @@ -30,33 +57,41 @@ public void testPurgeBufferManagerForParallelStreams() throws Exception { for (int i=0; i < numBuffers; i++) { freeList.add(i); } + ExecutorService executorService = Executors.newFixedThreadPool(4); AzureBlobFileSystem fs = getABFSWithReadAheadConfig(); - for(int i=0; i < 4; i++) { - String fileName = methodName.getMethodName() + i; - byte[] fileContent = getRandomBytesArray(ONE_MB); - Path testFilePath = createFileWithContent(fs, fileName, fileContent); - try (FSDataInputStream iStream = fs.open(testFilePath)) { - iStream.read(); + try { + for (int i = 0; i < 4; i++) { + final String fileName = methodName.getMethodName() + i; + executorService.submit((Callable) () -> { + byte[] fileContent = getRandomBytesArray(ONE_MB); + Path testFilePath = createFileWithContent(fs, fileName, fileContent); + try (FSDataInputStream iStream = fs.open(testFilePath)) { + iStream.read(); + } + return null; + }); } + } finally { + executorService.shutdown(); } - ReadBufferManager bufferManager = ReadBufferManager.getBufferManager(); - Assertions.assertThat(bufferManager.getCompletedReadListCopy().size()) - .describedAs("After closing all streams completed list size should be 0") - .isEqualTo(0); - Assertions.assertThat(bufferManager.getInProgressCopiedList().size()) - .describedAs("After closing all streams inProgress list size should be 0") - .isEqualTo(0); + ReadBufferManager bufferManager = ReadBufferManager.getBufferManager(); + assertListEmpty("CompletedList", bufferManager.getCompletedReadListCopy()); + assertListEmpty("InProgressList", bufferManager.getInProgressCopiedList()); + assertListEmpty("ReadAheadQueue", bufferManager.getReadAheadQueueCopy()); Assertions.assertThat(bufferManager.getFreeListCopy()) .describedAs("After closing all streams free list contents should match with " + freeList) .hasSize(numBuffers) .containsExactlyInAnyOrderElementsOf(freeList); - Assertions.assertThat(bufferManager.getReadAheadQueueCopy()) - .describedAs("After closing all stream ReadAheadQueue should be empty") - .hasSize(0); } + private void assertListEmpty(String listName, List list) { + Assertions.assertThat(list) + .describedAs("After closing all streams %s should be empty", listName) + .hasSize(0); + } + @Test public void testPurgeBufferManagerForSequentialStream() throws Exception { describe("Testing purging of buffers in ReadBufferManager for " + @@ -65,26 +100,43 @@ public void testPurgeBufferManagerForSequentialStream() throws Exception { final String fileName = methodName.getMethodName(); byte[] fileContent = getRandomBytesArray(ONE_MB); Path testFilePath = createFileWithContent(fs, fileName, fileContent); - AbfsInputStream iStream1 = (AbfsInputStream) fs.open(testFilePath).getWrappedStream(); - iStream1.read(); - // closing the stream right away. - iStream1.close(); - AbfsInputStream iStream2 = (AbfsInputStream) fs.open(testFilePath).getWrappedStream(); - iStream2.read(); + + AbfsInputStream iStream1 = null; + // stream1 will be closed right away. + try { + iStream1 = (AbfsInputStream) fs.open(testFilePath).getWrappedStream(); + // Just reading one byte will trigger all read ahead calls. + iStream1.read(); + } finally { + IOUtils.closeStream(iStream1); + } ReadBufferManager bufferManager = ReadBufferManager.getBufferManager(); - assertListDoesnotContainBuffersForIstream(bufferManager.getInProgressCopiedList(), iStream1); - assertListDoesnotContainBuffersForIstream(bufferManager.getCompletedReadListCopy(), iStream1); - assertListDoesnotContainBuffersForIstream(bufferManager.getReadAheadQueueCopy(), iStream1); - // closing the stream later. - iStream2.close(); + AbfsInputStream iStream2 = null; + try { + iStream2 = (AbfsInputStream) fs.open(testFilePath).getWrappedStream(); + iStream2.read(); + // After closing stream1, none of the buffers associated with stream1 should be present. + assertListDoesnotContainBuffersForIstream(bufferManager.getInProgressCopiedList(), iStream1); + assertListDoesnotContainBuffersForIstream(bufferManager.getCompletedReadListCopy(), iStream1); + assertListDoesnotContainBuffersForIstream(bufferManager.getReadAheadQueueCopy(), iStream1); + } finally { + // closing the stream later. + IOUtils.closeStream(iStream2); + } + // After closing stream2, none of the buffers associated with stream2 should be present. assertListDoesnotContainBuffersForIstream(bufferManager.getInProgressCopiedList(), iStream2); assertListDoesnotContainBuffersForIstream(bufferManager.getCompletedReadListCopy(), iStream2); assertListDoesnotContainBuffersForIstream(bufferManager.getReadAheadQueueCopy(), iStream2); + // After closing both the streams, all lists should be empty. + assertListEmpty("CompletedList", bufferManager.getCompletedReadListCopy()); + assertListEmpty("InProgressList", bufferManager.getInProgressCopiedList()); + assertListEmpty("ReadAheadQueue", bufferManager.getReadAheadQueueCopy()); + } - private void assertListDoesnotContainBuffersForIstream(LinkedList list, + private void assertListDoesnotContainBuffersForIstream(List list, AbfsInputStream inputStream) { for (ReadBuffer buffer : list) { Assertions.assertThat(buffer.getStream()) From 84179a3e4547257e4d3fb37a88be20224cf64a77 Mon Sep 17 00:00:00 2001 From: Mukund Thakur Date: Thu, 19 Aug 2021 19:46:36 +0530 Subject: [PATCH 3/4] Moving new test to run in seperate jvm and creating new Azure FS instances in tests --- hadoop-tools/hadoop-azure/pom.xml | 2 ++ .../fs/azurebfs/services/ITestReadBufferManager.java | 10 +++++----- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/hadoop-tools/hadoop-azure/pom.xml b/hadoop-tools/hadoop-azure/pom.xml index cc773ab777fc3..1896e15d27018 100644 --- a/hadoop-tools/hadoop-azure/pom.xml +++ b/hadoop-tools/hadoop-azure/pom.xml @@ -555,6 +555,7 @@ **/azurebfs/ITestAzureBlobFileSystemListStatus.java **/azurebfs/extensions/ITestAbfsDelegationTokens.java **/azurebfs/ITestSmallWriteOptimization.java + **/azurebfs/services/ITestReadBufferManager.java @@ -595,6 +596,7 @@ **/azurebfs/ITestAzureBlobFileSystemListStatus.java **/azurebfs/extensions/ITestAbfsDelegationTokens.java **/azurebfs/ITestSmallWriteOptimization.java + **/azurebfs/services/ITestReadBufferManager.java diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java index 1deca9dc949f9..15bb17a14ecdb 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java @@ -50,8 +50,8 @@ public ITestReadBufferManager() throws Exception { @Test public void testPurgeBufferManagerForParallelStreams() throws Exception { - describe("Testing purging of buffers from ReadBufferManager for " + - "parallel input streams"); + describe("Testing purging of buffers from ReadBufferManager for " + + "parallel input streams"); final int numBuffers = 16; final LinkedList freeList = new LinkedList<>(); for (int i=0; i < numBuffers; i++) { @@ -94,8 +94,8 @@ private void assertListEmpty(String listName, List list) { @Test public void testPurgeBufferManagerForSequentialStream() throws Exception { - describe("Testing purging of buffers in ReadBufferManager for " + - "sequential input streams"); + describe("Testing purging of buffers in ReadBufferManager for " + + "sequential input streams"); AzureBlobFileSystem fs = getABFSWithReadAheadConfig(); final String fileName = methodName.getMethodName(); byte[] fileContent = getRandomBytesArray(ONE_MB); @@ -150,7 +150,7 @@ private AzureBlobFileSystem getABFSWithReadAheadConfig() throws Exception { conf.setLong(FS_AZURE_READ_AHEAD_QUEUE_DEPTH, 8); conf.setInt(AZURE_READ_BUFFER_SIZE, MIN_BUFFER_SIZE); conf.setInt(FS_AZURE_READ_AHEAD_BLOCK_SIZE, MIN_BUFFER_SIZE); - return getFileSystem(conf); + return (AzureBlobFileSystem) FileSystem.newInstance(conf); } protected byte[] getRandomBytesArray(int length) { From 5066471e3da3c55b22a6af9e6b8535e635783ee1 Mon Sep 17 00:00:00 2001 From: Mukund Thakur Date: Mon, 6 Sep 2021 13:52:34 +0530 Subject: [PATCH 4/4] HADOOP-17156 import new line --- .../services/ITestReadBufferManager.java | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java index 15bb17a14ecdb..705cc2530d335 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java @@ -18,6 +18,14 @@ package org.apache.hadoop.fs.azurebfs.services; +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -26,17 +34,10 @@ import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest; import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; import org.apache.hadoop.io.IOUtils; + import org.assertj.core.api.Assertions; import org.junit.Test; -import java.io.IOException; -import java.util.LinkedList; -import java.util.List; -import java.util.Random; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_READ_BUFFER_SIZE; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READ_AHEAD_BLOCK_SIZE; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READ_AHEAD_QUEUE_DEPTH;