-
Notifications
You must be signed in to change notification settings - Fork 9.2k
HADOOP-17428. ABFS: Implementation for getContentSummary #2549
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
8708856
01aba06
2876a8b
a9960da
30cf195
95d1396
03d342c
bb55b14
a9e94a9
06609da
1433c85
27b6007
d747f06
be2daf0
96cd2b9
e3eaca7
94a95df
48d0607
a10be00
636b434
bc276b2
744f8c4
9070413
657d7ea
041d9bc
9c92338
4be7b19
7a2e218
d21b58a
9b2723b
2378431
fe71af1
fa34b57
aa48086
f320785
be0e94c
2104268
a718cbd
c9d65aa
4003aff
3039f7f
8259a2e
b64b492
16d9436
137627d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,102 @@ | ||
| /** | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * <p> | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * <p> | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.hadoop.fs.azurebfs.services; | ||
|
|
||
| import org.apache.hadoop.fs.FileStatus; | ||
sumangala-patki marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| import org.apache.hadoop.fs.Path; | ||
| import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore; | ||
| import org.apache.hadoop.fs.azurebfs.utils.ContentSummary; | ||
|
|
||
| import java.io.IOException; | ||
| import java.util.concurrent.atomic.AtomicLong; | ||
|
|
||
| public class ContentSummaryProcessor { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: javadocs
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. added description |
||
| private final AtomicLong fileCount = new AtomicLong(0L); | ||
| private final AtomicLong directoryCount = new AtomicLong(0L); | ||
| private final AtomicLong totalBytes = new AtomicLong(0L); | ||
| private final ProcessingQueue<FileStatus> queue = new ProcessingQueue<>(); | ||
| private final AzureBlobFileSystemStore abfsStore; | ||
| private static final int NUM_THREADS = 16; | ||
|
|
||
| public ContentSummaryProcessor(AzureBlobFileSystemStore abfsStore) { | ||
| this.abfsStore = abfsStore; | ||
| } | ||
|
|
||
| public ContentSummary getContentSummary(Path path) throws IOException { | ||
| processDirectoryTree(path); | ||
| Thread[] threads = new Thread[16]; | ||
|
|
||
| for (int i = 0; i < NUM_THREADS; ++i) { | ||
| threads[i] = new Thread(new ContentSummaryProcessor.ThreadProcessor()); | ||
| threads[i].start(); | ||
| } | ||
|
|
||
| for (Thread t : threads) { | ||
| try { | ||
| t.join(); | ||
| } catch (InterruptedException e) { | ||
| Thread.currentThread().interrupt(); | ||
| } | ||
| } | ||
| return new ContentSummary(totalBytes.get(), directoryCount.get(), | ||
| fileCount.get(), totalBytes.get()); | ||
| } | ||
|
|
||
| private void processDirectoryTree(Path path) throws IOException { | ||
| FileStatus[] fileStatuses = abfsStore.listStatus(path); | ||
sumangala-patki marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
sumangala-patki marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| for (FileStatus fileStatus : fileStatuses) { | ||
|
||
| if (fileStatus.isDirectory()) { | ||
| this.processDirectory(); | ||
| this.queue.add(fileStatus); | ||
| } else { | ||
| this.processFile(fileStatus); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private void processDirectory() { | ||
| this.directoryCount.incrementAndGet(); | ||
| } | ||
|
|
||
| private void processFile(FileStatus fileStatus) { | ||
| this.fileCount.incrementAndGet(); | ||
| this.totalBytes.addAndGet(fileStatus.getLen()); | ||
| } | ||
|
|
||
| private final class ThreadProcessor implements Runnable { | ||
| private ThreadProcessor() { | ||
| } | ||
|
|
||
| public void run() { | ||
| try { | ||
| FileStatus fileStatus; | ||
| while ((fileStatus = ContentSummaryProcessor.this.queue.poll()) | ||
| != null) { | ||
| if (fileStatus.isDirectory()) { | ||
| ContentSummaryProcessor.this | ||
| .processDirectoryTree(fileStatus.getPath()); | ||
| } | ||
| ContentSummaryProcessor.this.queue.unregister(); | ||
| } | ||
| } catch (IOException e) { | ||
| throw new RuntimeException("IOException processing Directory tree", e); | ||
| } | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,80 @@ | ||
| /** | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * <p> | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * <p> | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.hadoop.fs.azurebfs.services; | ||
|
|
||
| import java.util.LinkedList; | ||
| import java.util.Queue; | ||
|
|
||
| public class ProcessingQueue<T> { | ||
|
|
||
| private final Queue<T> internalQueue = new LinkedList<>(); | ||
| private int processorCount = 0; | ||
|
|
||
| ProcessingQueue() { | ||
| } | ||
|
|
||
| public synchronized void add(T item) { | ||
| if (item == null) { | ||
| throw new IllegalArgumentException("Cannot put null into queue"); | ||
| } else { | ||
| this.internalQueue.add(item); | ||
| this.notifyAll(); | ||
| } | ||
| } | ||
|
|
||
| public synchronized T poll() { | ||
| while (true) { | ||
| try { | ||
| if (this.isQueueEmpty() && !this.done()) { | ||
| this.wait(); | ||
| continue; | ||
| } | ||
| if (!this.isQueueEmpty()) { | ||
| ++this.processorCount; | ||
| return this.internalQueue.poll(); | ||
| } | ||
| return null; | ||
| } catch (InterruptedException var2) { | ||
| Thread.currentThread().interrupt(); | ||
| } | ||
| return null; | ||
| } | ||
| } | ||
|
|
||
| public synchronized void unregister() { | ||
| --this.processorCount; | ||
| if (this.processorCount < 0) { | ||
| throw new IllegalStateException( | ||
| "too many unregister()'s. processorCount is now " | ||
| + this.processorCount); | ||
| } else { | ||
| if (this.done()) { | ||
| this.notifyAll(); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private boolean done() { | ||
| return this.processorCount == 0 && this.isQueueEmpty(); | ||
| } | ||
|
|
||
| private boolean isQueueEmpty() { | ||
| return this.internalQueue.peek() == null; | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,49 @@ | ||
| /** | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * <p> | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * <p> | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.hadoop.fs.azurebfs.utils; | ||
|
|
||
| public class ContentSummary { | ||
| private final long length; | ||
| private final long directoryCount; | ||
| private final long fileCount; | ||
| private final long spaceConsumed; | ||
|
|
||
| public ContentSummary(long length, long directoryCount, long fileCount, | ||
| long spaceConsumed) { | ||
| this.length = length; | ||
| this.directoryCount = directoryCount; | ||
| this.fileCount = fileCount; | ||
| this.spaceConsumed = spaceConsumed; | ||
| } | ||
|
|
||
| public long getLength() { | ||
| return length; | ||
| } | ||
|
|
||
| public long getDirectoryCount() { | ||
| return directoryCount; | ||
| } | ||
|
|
||
| public long getFileCount() { | ||
| return fileCount; | ||
| } | ||
| public long getSpaceConsumed() { | ||
| return spaceConsumed; | ||
| } | ||
| } |
Uh oh!
There was an error while loading. Please reload this page.