Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -368,8 +368,12 @@ public boolean delete(final Path f, final boolean recursive) throws IOException

@Override
public ContentSummary getContentSummary(Path f) throws IOException {
org.apache.hadoop.fs.azurebfs.utils.ContentSummary contentSummary =
(new ContentSummaryProcessor(abfsStore)).getContentSummary(f);
org.apache.hadoop.fs.azurebfs.utils.ContentSummary contentSummary = null;
try {
contentSummary = (new ContentSummaryProcessor(abfsStore)).getContentSummary(f);
} catch (InterruptedException e) {
e.printStackTrace();
}
return new Builder().length(contentSummary.getLength())
.directoryCount(contentSummary.getDirectoryCount())
.fileCount(contentSummary.getFileCount())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,27 @@
import org.apache.hadoop.fs.azurebfs.utils.ContentSummary;

import java.io.IOException;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class ContentSummaryProcessor {
private final AtomicLong fileCount = new AtomicLong(0L);
private final AtomicLong directoryCount = new AtomicLong(0L);
private final AtomicLong totalBytes = new AtomicLong(0L);
private final ProcessingQueue<FileStatus> queue = new ProcessingQueue<>();
private final LinkedBlockingDeque<FileStatus> queue =
new LinkedBlockingDeque<>();
private final AzureBlobFileSystemStore abfsStore;
private static final int NUM_THREADS = 16;

public ContentSummaryProcessor(AzureBlobFileSystemStore abfsStore) {
this.abfsStore = abfsStore;
}

public ContentSummary getContentSummary(Path path) throws IOException {
public ContentSummary getContentSummary(Path path)
throws IOException, InterruptedException {
processDirectoryTree(path);
Thread[] threads = new Thread[16];
Thread[] threads = new Thread[NUM_THREADS];

for (int i = 0; i < NUM_THREADS; ++i) {
threads[i] = new Thread(new ContentSummaryProcessor.ThreadProcessor());
Expand All @@ -58,12 +62,13 @@ public ContentSummary getContentSummary(Path path) throws IOException {
fileCount.get(), totalBytes.get());
}

private void processDirectoryTree(Path path) throws IOException {
private void processDirectoryTree(Path path)
throws IOException, InterruptedException {
FileStatus[] fileStatuses = abfsStore.listStatus(path);
for (FileStatus fileStatus : fileStatuses) {
if (fileStatus.isDirectory()) {
this.processDirectory();
this.queue.add(fileStatus);
this.queue.put(fileStatus);
} else {
this.processFile(fileStatus);
}
Expand All @@ -86,16 +91,14 @@ private ThreadProcessor() {
public void run() {
try {
FileStatus fileStatus;
while ((fileStatus = ContentSummaryProcessor.this.queue.poll())
!= null) {
if (fileStatus.isDirectory()) {
ContentSummaryProcessor.this
.processDirectoryTree(fileStatus.getPath());
}
ContentSummaryProcessor.this.queue.unregister();
fileStatus = queue.poll(3, TimeUnit.SECONDS);
if (fileStatus == null)
return;
if (fileStatus.isDirectory()) {
processDirectoryTree(fileStatus.getPath());
}
} catch (IOException e) {
throw new RuntimeException("IOException processing Directory tree", e);
} catch (InterruptedException | IOException interruptedException) {
interruptedException.printStackTrace();
}
}
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,30 @@
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_AZURE_LIST_MAX_RESULTS;

public class TestGetContentSummary extends AbstractAbfsIntegrationTest {
private final String[] directories = {"testFolder", "testFolder/testFolder1",
"testFolder/testFolder2", "testFolder/testFolder3", "testFolderII",
"testFolder/testFolder2/testFolder4",
"testFolder/testFolder2/testFolder5",
"testFolder/testFolder3/testFolder6",
"testFolder/testFolder3/testFolder7"};

private final String[] directories = {"/testFolder",
"/testFolder/testFolder1",
"/testFolder/testFolder2", "/testFolder/testFolder3", "/testFolderII",
"/testFolder/testFolder2/testFolder4",
"/testFolder/testFolder2/testFolder5",
"/testFolder/testFolder3/testFolder6",
"/testFolder/testFolder3/testFolder7",
"/testFolder/testFolder3/testFolder6/leafDir",
"/testFolderII/listMaxDir",
"/testFolderII/listMaxDir/zFolder"};
//thread poll should not get interrupted before zFolder is put in queue

private final Path pathToFile = new Path("/testFolder/test1");;
private final Path pathToListMaxDir = new Path("/testFolderII/listMaxDir");
private final Path pathToLeafDir =
new Path("/testFolder/testFolder3/testFolder6/leafDir");
private final Path pathToIntermediateDirWithFilesOnly = new Path(
"/testFolder/testFolder2/testFolder5");
private final Path pathToIntermediateDirWithFilesAndSubdirs = new Path(
"/testFolder/testFolder3");
private final String[] dirsWithNonEmptyFiles = {"/testFolder", "/testFolder/testFolder1",
"/testFolder/testFolder2/testFolder5", "/testFolder/testFolder3"};

private final AzureBlobFileSystem fs = createFileSystem();
private final int testBufferSize = 20;
private final int filesPerDirectory = 2;
Expand All @@ -55,84 +73,49 @@ public TestGetContentSummary() throws Exception {
}

@Test
public void testFilesystemRoot() throws IOException {
public void testFilesystemRoot()
throws IOException {
int fileCount =
(directories.length - 2) * filesPerDirectory + numFilesForListMaxTest;
ContentSummary contentSummary = fs.getContentSummary(new Path("/"));
checkContentSummary(contentSummary, directories.length,
directories.length * filesPerDirectory, 0);
checkContentSummary(contentSummary, directories.length, fileCount,
dirsWithNonEmptyFiles.length * filesPerDirectory * testBufferSize);
}

@Test
public void testFileContentSummary() throws IOException {
Path filePath = new Path("/testFolderII/testFile");
FSDataOutputStream out = fs.create(filePath);
out.write(b);
out.close();
ContentSummary contentSummary = fs.getContentSummary(filePath);
ContentSummary contentSummary = fs.getContentSummary(pathToFile);
checkContentSummary(contentSummary, 0, 1, testBufferSize);
}

@Test
public void testLeafDir() throws IOException {
Path pathToLeafDir = new Path(
"/testFolder/testFolder2/testFolder4" + "/leafDir");
fs.mkdirs(pathToLeafDir);
ContentSummary contentSummary = fs.getContentSummary(pathToLeafDir);
checkContentSummary(contentSummary, 0, 0, 0);
}

@Test
public void testIntermediateDirWithFilesOnly() throws IOException {
String dirPath = "/testFolder/testFolder3/testFolder6";
for (int i = 0; i < filesPerDirectory; i++) {
FSDataOutputStream out = fs.append(new Path(dirPath + "/test" + i));
out.write(b);
out.close();
}
ContentSummary contentSummary = fs.getContentSummary(new Path(dirPath));
ContentSummary contentSummary =
fs.getContentSummary(pathToIntermediateDirWithFilesOnly);
checkContentSummary(contentSummary, 0, filesPerDirectory,
testBufferSize * filesPerDirectory);
}

@Test
public void testIntermediateDirWithFilesAndSubdirs() throws IOException {
Path dirPath = new Path("/testFolder/testFolder3");
for (int i = 0; i < filesPerDirectory; i++) {
FSDataOutputStream out = fs.append(new Path(dirPath + "/test" + i));
out.write(b);
out.close();
}
Path dir2Path = new Path("/testFolder/testFolder3/testFolder6");
for (int i = 0; i < filesPerDirectory; i++) {
FSDataOutputStream out = fs.append(new Path(dir2Path + "/test" + i));
out.write(b);
out.close();
}
ContentSummary contentSummary = fs.getContentSummary(dirPath);
checkContentSummary(contentSummary, 2, 3 * filesPerDirectory,
testBufferSize * 2 * 2);
}

@Test
public void testEmptyDir() throws IOException {
Path pathToEmptyDir = new Path("/testFolder/emptyDir");
fs.mkdirs(pathToEmptyDir);
ContentSummary contentSummary = fs.getContentSummary(pathToEmptyDir);
checkContentSummary(contentSummary, 0, 0, 0);
ContentSummary contentSummary =
fs.getContentSummary(pathToIntermediateDirWithFilesAndSubdirs);
checkContentSummary(contentSummary, 3, 3 * filesPerDirectory,
testBufferSize * filesPerDirectory);
}

@Test
public void testDirOverListMaxResultsItems()
throws IOException, ExecutionException, InterruptedException {
Path pathToDir = new Path("/testFolder/testFolder2/maxListDir");
fs.mkdirs(pathToDir);
populateDirWithFiles(pathToDir, numFilesForListMaxTest);
FSDataOutputStream out = fs.append(new Path(pathToDir + "/test0"));
out.write(b);
out.close();
checkContentSummary(
fs.getContentSummary(new Path("/testFolder" + "/testFolder2")), 3,
numFilesForListMaxTest + filesPerDirectory * 3,
testBufferSize);
fs.getContentSummary(pathToListMaxDir), 1,
numFilesForListMaxTest + filesPerDirectory, 0);
}

private void checkContentSummary(ContentSummary contentSummary,
Expand All @@ -150,10 +133,21 @@ private void checkContentSummary(ContentSummary contentSummary,
private void createDirectoryStructure()
throws IOException, ExecutionException, InterruptedException {
for (String directory : directories) {
Path dirPath = new Path("/" + directory);
Path dirPath = new Path(directory);
fs.mkdirs(dirPath);
populateDirWithFiles(dirPath, filesPerDirectory);
if (!(dirPath.equals(pathToLeafDir) || dirPath
.equals(pathToListMaxDir))) {
populateDirWithFiles(dirPath, filesPerDirectory);
}
}
for (String dir : dirsWithNonEmptyFiles) {
for (int i = 0; i < filesPerDirectory; i++) {
FSDataOutputStream out = fs.append(new Path(dir + "/test" + i));
out.write(b);
out.close();
}
}
populateDirWithFiles(pathToListMaxDir, numFilesForListMaxTest);
}

private void populateDirWithFiles(Path directory, int numFiles)
Expand Down