Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
8708856
add getContentSummary and prelim test
sumangala17 Dec 11, 2020
01aba06
remove gfs call
sumangala17 Dec 14, 2020
2876a8b
add tests
sumangala17 Dec 15, 2020
a9960da
pr draft
sumangala17 Dec 15, 2020
30cf195
checkstyle fix
sumangala17 Dec 16, 2020
95d1396
linkedBlockingQ + junit test fix
sumangala17 Dec 17, 2020
03d342c
linkedBlockingQ + junit test fix (#5)
sumangala-patki Dec 17, 2020
bb55b14
using executors
sumangala17 Dec 22, 2020
a9e94a9
using executors
sumangala17 Dec 22, 2020
06609da
run()->call(), terminate condition, add invalid path test
sumangala17 Dec 24, 2020
1433c85
pr revw + checkstyle
sumangala17 Dec 24, 2020
27b6007
Merge branch 'trunk' into HADOOP-17428
sumangala17 Dec 24, 2020
d747f06
findbugs use future returned
sumangala17 Dec 24, 2020
be2daf0
completion service + temp concurrency tests
sumangala17 Jan 5, 2021
96cd2b9
pr revw + exec test
sumangala17 Jan 7, 2021
e3eaca7
clean up
sumangala17 Jan 8, 2021
94a95df
minor changes
sumangala17 Jan 8, 2021
48d0607
rm thread test
sumangala17 Jan 9, 2021
a10be00
checkstyle
sumangala17 Jan 10, 2021
636b434
Merge branch 'trunk' into HADOOP-17428
sumangala17 Jan 10, 2021
bc276b2
revw changes + doc
sumangala17 Jan 12, 2021
744f8c4
javadoc
sumangala17 Jan 12, 2021
9070413
trigger yetus
sumangala17 Jan 12, 2021
657d7ea
Merge branch 'trunk' into HADOOP-17428
sumangala17 Feb 7, 2021
041d9bc
use listingsupport to abstract store
sumangala17 Feb 7, 2021
9c92338
merge
sumangala17 Feb 22, 2021
4be7b19
checkstyle
sumangala17 Feb 22, 2021
7a2e218
Merge branch 'trunk' into HADOOP-17428
sumangala17 Feb 22, 2021
d21b58a
import order
sumangala17 Feb 22, 2021
9b2723b
Merge branch 'trunk' into HADOOP-17428
sumangala17 Mar 31, 2021
2378431
log ex
sumangala17 Apr 19, 2021
fe71af1
Merge branch 'trunk' into HADOOP-17428
sumangala17 May 11, 2021
fa34b57
rm abfs cs
sumangala17 May 11, 2021
aa48086
test fix
sumangala17 May 12, 2021
f320785
clean up
sumangala17 May 12, 2021
be0e94c
merge with tc
sumangala17 Jul 5, 2021
2104268
Merge branch 'trunk' into HADOOP-17428
sumangala17 Aug 23, 2021
a718cbd
address revw comments
sumangala17 Aug 25, 2021
c9d65aa
review comments part 2: move executor->abfsStore
sumangala17 Aug 26, 2021
4003aff
Merge branch 'trunk' into HADOOP-17428
sumangala17 Sep 8, 2021
3039f7f
use iterator + rejected-ex handler
sumangala17 Sep 10, 2021
8259a2e
undo extra formatting
sumangala17 Sep 13, 2021
b64b492
more formatting
sumangala17 Sep 13, 2021
16d9436
format
sumangala17 Sep 13, 2021
137627d
fix merge conflict
sumangala17 Jan 10, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -992,7 +992,7 @@ public ExponentialRetryPolicy getOauthTokenFetchRetryPolicy() {

public int getWriteMaxConcurrentRequestCount() {
if (this.writeMaxConcurrentRequestCount < 1) {
return 4 * Runtime.getRuntime().availableProcessors();
return 4 * getAvailableProcessorCount();
}
return this.writeMaxConcurrentRequestCount;
}
Expand All @@ -1013,6 +1013,10 @@ public String getClientProvidedEncryptionKey() {
return rawConfig.get(accSpecEncKey, null);
}

public static int getAvailableProcessorCount() {
return Runtime.getRuntime().availableProcessors();
}

@VisibleForTesting
void setReadBufferSize(int bufferSize) {
this.readBufferSize = bufferSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URI;
Expand Down Expand Up @@ -57,6 +58,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.CommonPathCapabilities;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
Expand All @@ -82,6 +84,7 @@
import org.apache.hadoop.fs.azurebfs.security.AbfsDelegationTokenManager;
import org.apache.hadoop.fs.azurebfs.services.AbfsCounters;
import org.apache.hadoop.fs.azurebfs.services.AbfsLocatedFileStatus;
import org.apache.hadoop.fs.azurebfs.services.ContentSummaryProcessor;
import org.apache.hadoop.fs.azurebfs.utils.Listener;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderFormat;
Expand Down Expand Up @@ -484,6 +487,33 @@ public boolean delete(final Path f, final boolean recursive) throws IOException

}

/**
* Returns a ContentSummary instance containing the count of directories,
* files and total number of bytes under a given path
*
* @param path The given path
* @return ContentSummary
* @throws IOException if an error is encountered during listStatus calls
* or if there is any issue with the thread pool used
* while processing
*/
@Override
public ContentSummary getContentSummary(Path path) throws IOException {
try {
TracingContext tracingContext = new TracingContext(clientCorrelationId,
fileSystemId, FSOperationType.GET_CONTENT_SUMMARY, true,
tracingHeaderFormat, listener);
return (new ContentSummaryProcessor(abfsStore)).getContentSummary(path,
tracingContext);
} catch (InterruptedException e) {
LOG.debug("Thread interrupted");
throw new InterruptedIOException(e.getMessage());
} catch (ExecutionException ex) {
LOG.debug("GetContentSummary failed with error: {}", ex.getMessage());
throw new PathIOException(path.toString(), ex);
}
}

@Override
public FileStatus[] listStatus(final Path f) throws IOException {
LOG.debug(
Expand Down Expand Up @@ -1192,9 +1222,8 @@ public RemoteIterator<FileStatus> listStatusIterator(Path path)
if (abfsStore.getAbfsConfiguration().enableAbfsListIterator()) {
TracingContext tracingContext = new TracingContext(clientCorrelationId,
fileSystemId, FSOperationType.LISTSTATUS, true, tracingHeaderFormat, listener);
AbfsListStatusRemoteIterator abfsLsItr =
new AbfsListStatusRemoteIterator(getFileStatus(path, tracingContext), abfsStore,
tracingContext);
AbfsListStatusRemoteIterator abfsLsItr = new AbfsListStatusRemoteIterator(
path, abfsStore, tracingContext);
return RemoteIterators.typeCastingRemoteIterator(abfsLsItr);
} else {
return super.listStatusIterator(path);
Expand Down Expand Up @@ -1516,7 +1545,7 @@ Map<String, Long> getInstrumentationMap() {
}

@VisibleForTesting
String getFileSystemId() {
public String getFileSystemId() {
return fileSystemId;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
import java.util.WeakHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.classification.VisibleForTesting;
Expand Down Expand Up @@ -173,6 +175,7 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
private final IdentityTransformerInterface identityTransformer;
private final AbfsPerfTracker abfsPerfTracker;
private final AbfsCounters abfsCounters;
private final ThreadPoolExecutor contentSummaryExecutorService;

/**
* The set of directories where we should store files as append blobs.
Expand Down Expand Up @@ -256,6 +259,17 @@ public AzureBlobFileSystemStore(
this.appendBlobDirSet = new HashSet<>(Arrays.asList(
abfsConfiguration.getAppendBlobDirs().split(AbfsHttpConstants.COMMA)));
}
contentSummaryExecutorService = new ThreadPoolExecutor(0,
4 * AbfsConfiguration.getAvailableProcessorCount(), 60,
TimeUnit.SECONDS, new SynchronousQueue<>());
contentSummaryExecutorService.setRejectedExecutionHandler(
(runnable, threadPoolExecutor) -> {
try {
contentSummaryExecutorService.getQueue().put(runnable);
} catch (InterruptedException e) {
LOG.debug("Could not submit GetContentSummary task to thread pool");
}
});
this.blockFactory = abfsStoreBuilder.blockFactory;
this.blockOutputActiveBlocks = abfsStoreBuilder.blockOutputActiveBlocks;
this.boundedThreadPool = BlockingThreadPoolExecutorService.newInstance(
Expand Down Expand Up @@ -1734,6 +1748,10 @@ private AbfsPerfInfo startTracking(String callerName, String calleeName) {
return new AbfsPerfInfo(abfsPerfTracker, callerName, calleeName);
}

public ExecutorService getContentSummaryExecutorService() {
return contentSummaryExecutorService;
}

/**
* A File status with version info extracted from the etag value returned
* in a LIST or HEAD request.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public enum FSOperationType {
DELETE("DL"),
GET_ACL_STATUS("GA"),
GET_ATTR("GR"),
GET_CONTENT_SUMMARY("GC"),
GET_FILESTATUS("GF"),
LISTSTATUS("LS"),
MKDIR("MK"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.slf4j.LoggerFactory;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;

Expand All @@ -45,7 +46,7 @@ public class AbfsListStatusRemoteIterator
private static final int MAX_QUEUE_SIZE = 10;
private static final long POLL_WAIT_TIME_IN_MS = 250;

private final FileStatus fileStatus;
private final Path path;
private final ListingSupport listingSupport;
private final ArrayBlockingQueue<AbfsListResult> listResultQueue;
private final TracingContext tracingContext;
Expand All @@ -55,9 +56,9 @@ public class AbfsListStatusRemoteIterator
private String continuation;
private Iterator<FileStatus> currIterator;

public AbfsListStatusRemoteIterator(final FileStatus fileStatus,
public AbfsListStatusRemoteIterator(final Path path,
final ListingSupport listingSupport, TracingContext tracingContext) {
this.fileStatus = fileStatus;
this.path = path;
this.listingSupport = listingSupport;
this.tracingContext = tracingContext;
listResultQueue = new ArrayBlockingQueue<>(MAX_QUEUE_SIZE);
Expand Down Expand Up @@ -144,7 +145,7 @@ private synchronized void addNextBatchIteratorToQueue()
throws IOException, InterruptedException {
List<FileStatus> fileStatuses = new ArrayList<>();
continuation = listingSupport
.listStatus(fileStatus.getPath(), null, fileStatuses, FETCH_ALL_FALSE,
.listStatus(path, null, fileStatuses, FETCH_ALL_FALSE,
continuation, tracingContext);
if (!fileStatuses.isEmpty()) {
listResultQueue.put(new AbfsListResult(fileStatuses.iterator()));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.azurebfs.services;

import java.io.IOException;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;

/**
* Class to carry out parallelized recursive listing on a given path to
* collect directory and file count/size information, as part of the
* implementation for the Filesystem method getContentSummary
*/
public class ContentSummaryProcessor {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: javadocs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added description

private static final int MAX_THREAD_COUNT = 16;
private static final int POLL_TIMEOUT = 100;
private static final Logger LOG = LoggerFactory.getLogger(ContentSummaryProcessor.class);
private final AtomicLong fileCount = new AtomicLong(0L);
private final AtomicLong directoryCount = new AtomicLong(0L);
private final AtomicLong totalBytes = new AtomicLong(0L);
private final AtomicInteger numTasks = new AtomicInteger(0);
private final ListingSupport abfsStore;
private final ExecutorService executorService;
private final CompletionService<Void> completionService;
private final LinkedBlockingQueue<FileStatus> queue = new LinkedBlockingQueue<>();

/**
* Processes a given path for count of subdirectories, files and total number
* of bytes
* @param abfsStore Instance of AzureBlobFileSystemStore, used to make
* listStatus calls to server
*/
public ContentSummaryProcessor(ListingSupport abfsStore) {
this.abfsStore = abfsStore;
this.executorService = ((AzureBlobFileSystemStore) abfsStore).getContentSummaryExecutorService();
completionService = new ExecutorCompletionService<>(this.executorService);
}

public ContentSummary getContentSummary(Path path,
TracingContext tracingContext)
throws IOException, ExecutionException, InterruptedException {

processDirectoryTree(path, tracingContext);
while (!queue.isEmpty() || numTasks.get() > 0) {
try {
completionService.take().get();
} finally {
numTasks.decrementAndGet();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

r we decrementing twice on the last task - once here and once in finally?

LOG.debug(
"FileStatus queue size = {}, number of submitted unfinished tasks"
+ " = {}, active thread count = {}",
queue.size(), numTasks,
((ThreadPoolExecutor) executorService).getActiveCount());
}
}

LOG.debug("Processed content summary of subtree under given path");
ContentSummary.Builder builder =
new ContentSummary.Builder().directoryCount(
directoryCount.get()).fileCount(fileCount.get())
.length(totalBytes.get()).spaceConsumed(totalBytes.get());
return builder.build();
}

/**
* Calls listStatus on given fileStatus and populated fileStatus queue with
* subdirectories. Is called by new tasks to process the complete subtree
* under a given fileStatus
* @param fileStatus : Path to a file or directory
* @throws IOException: listStatus error
* @throws InterruptedException: error while inserting into queue
*/
private void processDirectoryTree(Path fileStatus,
TracingContext tracingContext) throws IOException, InterruptedException {
AbfsListStatusRemoteIterator iterator = new AbfsListStatusRemoteIterator(
fileStatus, abfsStore, tracingContext);
while (iterator.hasNext()) {
FileStatus status = iterator.next();
if (status.isDirectory()) {
queue.put(status);
processDirectory();
conditionalSubmitTaskToExecutor(tracingContext);
} else {
processFile(status);
}
}
}

private void processDirectory() {
directoryCount.incrementAndGet();
}

/**
* Increments file count and byte count
* @param fileStatus: Provides file size to update byte count
*/
private void processFile(FileStatus fileStatus) {
fileCount.incrementAndGet();
totalBytes.addAndGet(fileStatus.getLen());
}

/**
* Submit task for processing a subdirectory based on current size of
* filestatus queue and number of already submitted tasks
*/
private synchronized void conditionalSubmitTaskToExecutor(TracingContext tracingContext) {
if (!queue.isEmpty() && numTasks.get() < MAX_THREAD_COUNT) {
numTasks.incrementAndGet();
Copy link
Contributor

@bilaharith bilaharith Jan 11, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See if there is a method for simple increment, since you are not using the return value

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't find any op without returning, will have to stick to this

completionService.submit(() -> {
FileStatus fileStatus1;
while ((fileStatus1 = queue.poll(POLL_TIMEOUT, TimeUnit.MILLISECONDS))
!= null) {
processDirectoryTree(fileStatus1.getPath(),
new TracingContext(tracingContext));
}
return null;
});
}
}

}
Loading