diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index e0ca9525db643..8c8f18f2a2f7f 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -368,8 +368,12 @@ public boolean delete(final Path f, final boolean recursive) throws IOException @Override public ContentSummary getContentSummary(Path f) throws IOException { - org.apache.hadoop.fs.azurebfs.utils.ContentSummary contentSummary = - (new ContentSummaryProcessor(abfsStore)).getContentSummary(f); + org.apache.hadoop.fs.azurebfs.utils.ContentSummary contentSummary = null; + try { + contentSummary = (new ContentSummaryProcessor(abfsStore)).getContentSummary(f); + } catch (InterruptedException e) { + e.printStackTrace(); + } return new Builder().length(contentSummary.getLength()) .directoryCount(contentSummary.getDirectoryCount()) .fileCount(contentSummary.getFileCount()) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ContentSummaryProcessor.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ContentSummaryProcessor.java index df0042392b881..ed716d74fec04 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ContentSummaryProcessor.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ContentSummaryProcessor.java @@ -24,13 +24,16 @@ import org.apache.hadoop.fs.azurebfs.utils.ContentSummary; import java.io.IOException; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; public class ContentSummaryProcessor { private final AtomicLong fileCount = new AtomicLong(0L); private final AtomicLong directoryCount = new AtomicLong(0L); private final AtomicLong totalBytes = new AtomicLong(0L); - private final ProcessingQueue queue = new ProcessingQueue<>(); + private final LinkedBlockingDeque queue = + new LinkedBlockingDeque<>(); private final AzureBlobFileSystemStore abfsStore; private static final int NUM_THREADS = 16; @@ -38,9 +41,10 @@ public ContentSummaryProcessor(AzureBlobFileSystemStore abfsStore) { this.abfsStore = abfsStore; } - public ContentSummary getContentSummary(Path path) throws IOException { + public ContentSummary getContentSummary(Path path) + throws IOException, InterruptedException { processDirectoryTree(path); - Thread[] threads = new Thread[16]; + Thread[] threads = new Thread[NUM_THREADS]; for (int i = 0; i < NUM_THREADS; ++i) { threads[i] = new Thread(new ContentSummaryProcessor.ThreadProcessor()); @@ -58,12 +62,13 @@ public ContentSummary getContentSummary(Path path) throws IOException { fileCount.get(), totalBytes.get()); } - private void processDirectoryTree(Path path) throws IOException { + private void processDirectoryTree(Path path) + throws IOException, InterruptedException { FileStatus[] fileStatuses = abfsStore.listStatus(path); for (FileStatus fileStatus : fileStatuses) { if (fileStatus.isDirectory()) { this.processDirectory(); - this.queue.add(fileStatus); + this.queue.put(fileStatus); } else { this.processFile(fileStatus); } @@ -86,16 +91,14 @@ private ThreadProcessor() { public void run() { try { FileStatus fileStatus; - while ((fileStatus = ContentSummaryProcessor.this.queue.poll()) - != null) { - if (fileStatus.isDirectory()) { - ContentSummaryProcessor.this - .processDirectoryTree(fileStatus.getPath()); - } - ContentSummaryProcessor.this.queue.unregister(); + fileStatus = queue.poll(3, TimeUnit.SECONDS); + if (fileStatus == null) + return; + if (fileStatus.isDirectory()) { + processDirectoryTree(fileStatus.getPath()); } - } catch (IOException e) { - throw new RuntimeException("IOException processing Directory tree", e); + } catch (InterruptedException | IOException interruptedException) { + interruptedException.printStackTrace(); } } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ProcessingQueue.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ProcessingQueue.java deleted file mode 100644 index 3e7d5ae07a24d..0000000000000 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ProcessingQueue.java +++ /dev/null @@ -1,80 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.azurebfs.services; - -import java.util.LinkedList; -import java.util.Queue; - -public class ProcessingQueue { - - private final Queue internalQueue = new LinkedList<>(); - private int processorCount = 0; - - ProcessingQueue() { - } - - public synchronized void add(T item) { - if (item == null) { - throw new IllegalArgumentException("Cannot put null into queue"); - } else { - this.internalQueue.add(item); - this.notifyAll(); - } - } - - public synchronized T poll() { - while (true) { - try { - if (this.isQueueEmpty() && !this.done()) { - this.wait(); - continue; - } - if (!this.isQueueEmpty()) { - ++this.processorCount; - return this.internalQueue.poll(); - } - return null; - } catch (InterruptedException var2) { - Thread.currentThread().interrupt(); - } - return null; - } - } - - public synchronized void unregister() { - --this.processorCount; - if (this.processorCount < 0) { - throw new IllegalStateException( - "too many unregister()'s. processorCount is now " - + this.processorCount); - } else { - if (this.done()) { - this.notifyAll(); - } - } - } - - private boolean done() { - return this.processorCount == 0 && this.isQueueEmpty(); - } - - private boolean isQueueEmpty() { - return this.internalQueue.peek() == null; - } -} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestGetContentSummary.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestGetContentSummary.java index b002165071be4..f26f789b05285 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestGetContentSummary.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestGetContentSummary.java @@ -36,12 +36,30 @@ import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_AZURE_LIST_MAX_RESULTS; public class TestGetContentSummary extends AbstractAbfsIntegrationTest { - private final String[] directories = {"testFolder", "testFolder/testFolder1", - "testFolder/testFolder2", "testFolder/testFolder3", "testFolderII", - "testFolder/testFolder2/testFolder4", - "testFolder/testFolder2/testFolder5", - "testFolder/testFolder3/testFolder6", - "testFolder/testFolder3/testFolder7"}; + + private final String[] directories = {"/testFolder", + "/testFolder/testFolder1", + "/testFolder/testFolder2", "/testFolder/testFolder3", "/testFolderII", + "/testFolder/testFolder2/testFolder4", + "/testFolder/testFolder2/testFolder5", + "/testFolder/testFolder3/testFolder6", + "/testFolder/testFolder3/testFolder7", + "/testFolder/testFolder3/testFolder6/leafDir", + "/testFolderII/listMaxDir", + "/testFolderII/listMaxDir/zFolder"}; + //thread poll should not get interrupted before zFolder is put in queue + + private final Path pathToFile = new Path("/testFolder/test1");; + private final Path pathToListMaxDir = new Path("/testFolderII/listMaxDir"); + private final Path pathToLeafDir = + new Path("/testFolder/testFolder3/testFolder6/leafDir"); + private final Path pathToIntermediateDirWithFilesOnly = new Path( + "/testFolder/testFolder2/testFolder5"); + private final Path pathToIntermediateDirWithFilesAndSubdirs = new Path( + "/testFolder/testFolder3"); + private final String[] dirsWithNonEmptyFiles = {"/testFolder", "/testFolder/testFolder1", + "/testFolder/testFolder2/testFolder5", "/testFolder/testFolder3"}; + private final AzureBlobFileSystem fs = createFileSystem(); private final int testBufferSize = 20; private final int filesPerDirectory = 2; @@ -55,84 +73,49 @@ public TestGetContentSummary() throws Exception { } @Test - public void testFilesystemRoot() throws IOException { + public void testFilesystemRoot() + throws IOException { + int fileCount = + (directories.length - 2) * filesPerDirectory + numFilesForListMaxTest; ContentSummary contentSummary = fs.getContentSummary(new Path("/")); - checkContentSummary(contentSummary, directories.length, - directories.length * filesPerDirectory, 0); + checkContentSummary(contentSummary, directories.length, fileCount, + dirsWithNonEmptyFiles.length * filesPerDirectory * testBufferSize); } @Test public void testFileContentSummary() throws IOException { - Path filePath = new Path("/testFolderII/testFile"); - FSDataOutputStream out = fs.create(filePath); - out.write(b); - out.close(); - ContentSummary contentSummary = fs.getContentSummary(filePath); + ContentSummary contentSummary = fs.getContentSummary(pathToFile); checkContentSummary(contentSummary, 0, 1, testBufferSize); } @Test public void testLeafDir() throws IOException { - Path pathToLeafDir = new Path( - "/testFolder/testFolder2/testFolder4" + "/leafDir"); - fs.mkdirs(pathToLeafDir); ContentSummary contentSummary = fs.getContentSummary(pathToLeafDir); checkContentSummary(contentSummary, 0, 0, 0); } @Test public void testIntermediateDirWithFilesOnly() throws IOException { - String dirPath = "/testFolder/testFolder3/testFolder6"; - for (int i = 0; i < filesPerDirectory; i++) { - FSDataOutputStream out = fs.append(new Path(dirPath + "/test" + i)); - out.write(b); - out.close(); - } - ContentSummary contentSummary = fs.getContentSummary(new Path(dirPath)); + ContentSummary contentSummary = + fs.getContentSummary(pathToIntermediateDirWithFilesOnly); checkContentSummary(contentSummary, 0, filesPerDirectory, testBufferSize * filesPerDirectory); } @Test public void testIntermediateDirWithFilesAndSubdirs() throws IOException { - Path dirPath = new Path("/testFolder/testFolder3"); - for (int i = 0; i < filesPerDirectory; i++) { - FSDataOutputStream out = fs.append(new Path(dirPath + "/test" + i)); - out.write(b); - out.close(); - } - Path dir2Path = new Path("/testFolder/testFolder3/testFolder6"); - for (int i = 0; i < filesPerDirectory; i++) { - FSDataOutputStream out = fs.append(new Path(dir2Path + "/test" + i)); - out.write(b); - out.close(); - } - ContentSummary contentSummary = fs.getContentSummary(dirPath); - checkContentSummary(contentSummary, 2, 3 * filesPerDirectory, - testBufferSize * 2 * 2); - } - - @Test - public void testEmptyDir() throws IOException { - Path pathToEmptyDir = new Path("/testFolder/emptyDir"); - fs.mkdirs(pathToEmptyDir); - ContentSummary contentSummary = fs.getContentSummary(pathToEmptyDir); - checkContentSummary(contentSummary, 0, 0, 0); + ContentSummary contentSummary = + fs.getContentSummary(pathToIntermediateDirWithFilesAndSubdirs); + checkContentSummary(contentSummary, 3, 3 * filesPerDirectory, + testBufferSize * filesPerDirectory); } @Test public void testDirOverListMaxResultsItems() throws IOException, ExecutionException, InterruptedException { - Path pathToDir = new Path("/testFolder/testFolder2/maxListDir"); - fs.mkdirs(pathToDir); - populateDirWithFiles(pathToDir, numFilesForListMaxTest); - FSDataOutputStream out = fs.append(new Path(pathToDir + "/test0")); - out.write(b); - out.close(); checkContentSummary( - fs.getContentSummary(new Path("/testFolder" + "/testFolder2")), 3, - numFilesForListMaxTest + filesPerDirectory * 3, - testBufferSize); + fs.getContentSummary(pathToListMaxDir), 1, + numFilesForListMaxTest + filesPerDirectory, 0); } private void checkContentSummary(ContentSummary contentSummary, @@ -150,10 +133,21 @@ private void checkContentSummary(ContentSummary contentSummary, private void createDirectoryStructure() throws IOException, ExecutionException, InterruptedException { for (String directory : directories) { - Path dirPath = new Path("/" + directory); + Path dirPath = new Path(directory); fs.mkdirs(dirPath); - populateDirWithFiles(dirPath, filesPerDirectory); + if (!(dirPath.equals(pathToLeafDir) || dirPath + .equals(pathToListMaxDir))) { + populateDirWithFiles(dirPath, filesPerDirectory); + } + } + for (String dir : dirsWithNonEmptyFiles) { + for (int i = 0; i < filesPerDirectory; i++) { + FSDataOutputStream out = fs.append(new Path(dir + "/test" + i)); + out.write(b); + out.close(); + } } + populateDirWithFiles(pathToListMaxDir, numFilesForListMaxTest); } private void populateDirWithFiles(Path directory, int numFiles)