fileStatuses = new ArrayList<>();
continuation = abfsStore
.listStatus(path, null, fileStatuses, FETCH_ALL_FALSE,
continuation);
- isFirstRead = false;
- if (!fileStatuses.isEmpty()) {
- this.allFileStatuses.add(fileStatuses);
+ if (fileStatuses != null && !fileStatuses.isEmpty()) {
+ iterators.add(fileStatuses.listIterator());
+ }
+ if (firstRead) {
+ firstRead = false;
}
return null;
- } catch (IOException e) {
- return e;
+ } catch (IOException ex) {
+ return ex;
}
});
}
+ private void forceFuture() throws IOException {
+ if (future == null) {
+ return;
+ }
+ IOException ex = awaitFuture(future);
+ if (ex != null) {
+ throw ex;
+ }
+ }
+
+ private boolean isIterationComplete() {
+ return !firstRead && (continuation == null || continuation.isEmpty());
+ }
+
}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatusIterator.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatusIterator.java
index 81c29e67940e2..9ba1c1676054a 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatusIterator.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatusIterator.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,36 +18,33 @@
package org.apache.hadoop.fs.azurebfs;
-import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.junit.Test;
+import org.mockito.Mockito;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
+import java.util.NoSuchElementException;
import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import static org.apache.hadoop.fs.contract.ContractTestUtils.assertMkdirs;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathExists;
-import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
-import static org.apache.hadoop.fs.contract.ContractTestUtils.rename;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/**
* Test listStatus operation.
*/
-public class ITestAzureBlobFileSystemListStatusIterator extends
- AbstractAbfsIntegrationTest {
+public class ITestAzureBlobFileSystemListStatusIterator
+ extends AbstractAbfsIntegrationTest {
+
private static final int TEST_FILES_NUMBER = 100;
public ITestAzureBlobFileSystemListStatusIterator() throws Exception {
@@ -57,36 +54,59 @@ public ITestAzureBlobFileSystemListStatusIterator() throws Exception {
@Test
public void testListPath() throws Exception {
final AzureBlobFileSystem fs = getFileSystem();
- final List> tasks = new ArrayList<>();
+ String rootPath = "testRoot1";
+ final List fileNames = createFiles(TEST_FILES_NUMBER, rootPath,
+ "testListPath");
+ AzureBlobFileSystemStore abfsStore = getAbfsStore(fs);
+ abfsStore.getAbfsConfiguration().setListMaxResults(10);
+ RemoteIterator fsIt = fs.listStatusIterator(new Path(rootPath));
+ int itrCount = 0;
+ while (fsIt.hasNext()) {
+ FileStatus fileStatus = fsIt.next();
+ String pathStr = fileStatus.getPath().toString();
+ fileNames.remove(pathStr);
+ itrCount++;
+ }
+ assertEquals(TEST_FILES_NUMBER, itrCount);
+ assertEquals(0, fileNames.size());
+ }
+
+ @Test(expected = NoSuchElementException.class)
+ public void testNextWhenNoMoreElementsPresent() throws Exception {
+ final AzureBlobFileSystem fs = getFileSystem();
+ String rootPathStr = "testRoot2";
+ Path rootPath = new Path(rootPathStr);
+ getFileSystem().create(rootPath);
+ RemoteIterator fsItr = fs.listStatusIterator(rootPath);
+ fsItr = Mockito.spy(fsItr);
+ Mockito.doReturn(false).when(fsItr).hasNext();
+ fsItr.next();
+ }
+ private List createFiles(int numFiles, String rootPathStr,
+ String filenamePrefix)
+ throws ExecutionException, InterruptedException, IOException {
+ final List> tasks = new ArrayList<>();
+ final List fileNames = new ArrayList<>();
ExecutorService es = Executors.newFixedThreadPool(10);
- for (int i = 0; i < TEST_FILES_NUMBER; i++) {
- final Path fileName = new Path("testRoot/test" + i);
+ final Path rootPath = new Path(rootPathStr);
+ for (int i = 0; i < numFiles; i++) {
+ final Path filePath = new Path(rootPath, filenamePrefix + i);
Callable callable = new Callable() {
@Override
public Void call() throws Exception {
- touch(fileName);
+ getFileSystem().create(filePath);
+ fileNames.add(makeQualified(filePath).toString());
return null;
}
};
-
tasks.add(es.submit(callable));
}
-
for (Future task : tasks) {
task.get();
}
-
es.shutdownNow();
-
- AzureBlobFileSystemStore abfsStore = getAbfsStore(fs);
- abfsStore.getAbfsConfiguration().setListMaxResults(10);
- RemoteIterator fsIt = fs.listStatusIterator(new Path(
- "user/bith/testRoot"));
- while(fsIt.hasNext()){
- FileStatus a = fsIt.next();
- System.out.println(a.getPath().toString().substring(50));
- }
+ return fileNames;
}
private AzureBlobFileSystemStore getAbfsStore(FileSystem fs)
From 13ddf5274a425bf83c8b25176f53b9e10be03102 Mon Sep 17 00:00:00 2001
From: Bilahari T H
Date: Wed, 30 Dec 2020 16:33:16 +0530
Subject: [PATCH 03/17] Making the server calls as a background activity
---
.../services/ListStatusRemoteIterator.java | 115 +++++++++++-------
1 file changed, 71 insertions(+), 44 deletions(-)
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListStatusRemoteIterator.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListStatusRemoteIterator.java
index f80927b80fa74..6fc4ebe1533a4 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListStatusRemoteIterator.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListStatusRemoteIterator.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.hadoop.fs.azurebfs.services;
import java.io.IOException;
@@ -9,48 +27,46 @@
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore;
-import static org.apache.hadoop.fs.impl.FutureIOSupport.awaitFuture;
-
public class ListStatusRemoteIterator implements RemoteIterator {
+ private static final Logger LOG = LoggerFactory
+ .getLogger(ListStatusRemoteIterator.class);
+
private static final boolean FETCH_ALL_FALSE = false;
private final Path path;
private final AzureBlobFileSystemStore abfsStore;
- private final Queue> iterators = new LinkedList<>();
+ private final Queue> iteratorsQueue =
+ new LinkedList<>();
private boolean firstRead = true;
- private CompletableFuture future;
private String continuation;
- private ListIterator lsItr;
+ private ListIterator currIterator;
+ private IOException ioException;
public ListStatusRemoteIterator(final Path path,
final AzureBlobFileSystemStore abfsStore) throws IOException {
this.path = path;
this.abfsStore = abfsStore;
- fetchMoreFileStatusesAsync();
- fetchMoreFileStatusesAsync();
- lsItr = iterators.poll();
+ fetchAllAsync();
+ updateCurrentIterator();
}
@Override
public boolean hasNext() throws IOException {
- if (lsItr.hasNext()) {
+ if (currIterator.hasNext()) {
return true;
}
- fetchMoreFileStatusesAsync();
- if (!iterators.isEmpty()) {
- lsItr = iterators.poll();
- if (lsItr == null) {
- return false;
- }
- }
- return lsItr.hasNext();
+ updateCurrentIterator();
+ return currIterator.hasNext();
}
@Override
@@ -58,43 +74,54 @@ public FileStatus next() throws IOException {
if (!this.hasNext()) {
throw new NoSuchElementException();
}
- return lsItr.next();
+ return currIterator.next();
}
- private void fetchMoreFileStatusesAsync() throws IOException {
- forceFuture();
- if (isIterationComplete()) {
- return;
+ private void updateCurrentIterator() throws IOException {
+ synchronized (this) {
+ while (!isIterationComplete() && iteratorsQueue.isEmpty()) {
+ try {
+ this.wait();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.error("Thread got interrupted: {}", e);
+ }
+ }
+ if (!iteratorsQueue.isEmpty()) {
+ currIterator = iteratorsQueue.poll();
+ } else if (ioException != null) {
+ throw ioException;
+ }
}
- this.future = CompletableFuture.supplyAsync(() -> {
- try {
+ }
+
+ private void fetchAllAsync() {
+ CompletableFuture.supplyAsync(() -> {
+ while (!isIterationComplete()) {
List fileStatuses = new ArrayList<>();
- continuation = abfsStore
- .listStatus(path, null, fileStatuses, FETCH_ALL_FALSE,
- continuation);
- if (fileStatuses != null && !fileStatuses.isEmpty()) {
- iterators.add(fileStatuses.listIterator());
+ try {
+ continuation = abfsStore
+ .listStatus(path, null, fileStatuses, FETCH_ALL_FALSE,
+ continuation);
+ } catch (IOException e) {
+ ioException = e;
+ return null;
+ } finally {
+ if (firstRead) {
+ firstRead = false;
+ }
}
- if (firstRead) {
- firstRead = false;
+ if (fileStatuses != null && !fileStatuses.isEmpty()) {
+ iteratorsQueue.add(fileStatuses.listIterator());
+ synchronized (this) {
+ this.notifyAll();
+ }
}
- return null;
- } catch (IOException ex) {
- return ex;
}
+ return null;
});
}
- private void forceFuture() throws IOException {
- if (future == null) {
- return;
- }
- IOException ex = awaitFuture(future);
- if (ex != null) {
- throw ex;
- }
- }
-
private boolean isIterationComplete() {
return !firstRead && (continuation == null || continuation.isEmpty());
}
From fc40333f784888c1c381b07472a077d455fd499a Mon Sep 17 00:00:00 2001
From: Bilahari T H
Date: Wed, 30 Dec 2020 20:26:54 +0530
Subject: [PATCH 04/17] Making the async call exit if the queue is full. Adding
more test cases.
---
.../fs/azurebfs/AzureBlobFileSystemStore.java | 9 +-
.../services/ListStatusRemoteIterator.java | 112 +++++++++++-------
...AzureBlobFileSystemListStatusIterator.java | 95 +++++++++++++--
3 files changed, 152 insertions(+), 64 deletions(-)
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
index e4866f3d7bac7..d6a2b9a36c772 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
@@ -114,7 +114,6 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.http.client.utils.URIBuilder;
-import static org.apache.commons.lang3.ArrayUtils.toArray;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_EQUALS;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_FORWARD_SLASH;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_HYPHEN;
@@ -872,12 +871,12 @@ public String listStatus(final Path path, final String startFrom,
final String relativePath = getRelativePath(path);
- if(continuation==null ||continuation.length()<1) {
+ if (continuation == null || continuation.length() < 1) {
// generate continuation token if a valid startFrom is provided.
if (startFrom != null && !startFrom.isEmpty()) {
- continuation = getIsNamespaceEnabled() ?
- generateContinuationTokenForXns(startFrom) :
- generateContinuationTokenForNonXns(relativePath, startFrom);
+ continuation = getIsNamespaceEnabled()
+ ? generateContinuationTokenForXns(startFrom)
+ : generateContinuationTokenForNonXns(relativePath, startFrom);
}
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListStatusRemoteIterator.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListStatusRemoteIterator.java
index 6fc4ebe1533a4..919d82352c826 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListStatusRemoteIterator.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListStatusRemoteIterator.java
@@ -20,11 +20,11 @@
import java.io.IOException;
import java.util.ArrayList;
-import java.util.LinkedList;
+import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
-import java.util.ListIterator;
import java.util.NoSuchElementException;
-import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import org.slf4j.Logger;
@@ -41,23 +41,26 @@ public class ListStatusRemoteIterator implements RemoteIterator {
.getLogger(ListStatusRemoteIterator.class);
private static final boolean FETCH_ALL_FALSE = false;
+ private static final int MAX_QUEUE_SIZE = 10;
private final Path path;
private final AzureBlobFileSystemStore abfsStore;
- private final Queue> iteratorsQueue =
- new LinkedList<>();
+ private final ArrayBlockingQueue> iteratorsQueue;
+ private final Object asyncOpLock = new Object();
- private boolean firstRead = true;
+ private boolean firstBatch = true;
+ private boolean isAsyncInProgress = false;
private String continuation;
- private ListIterator currIterator;
+ private Iterator currIterator;
private IOException ioException;
public ListStatusRemoteIterator(final Path path,
- final AzureBlobFileSystemStore abfsStore) throws IOException {
+ final AzureBlobFileSystemStore abfsStore) {
this.path = path;
this.abfsStore = abfsStore;
- fetchAllAsync();
- updateCurrentIterator();
+ iteratorsQueue = new ArrayBlockingQueue<>(MAX_QUEUE_SIZE);
+ currIterator = Collections.emptyIterator();
+ fetchBatchesAsync();
}
@Override
@@ -78,52 +81,69 @@ public FileStatus next() throws IOException {
}
private void updateCurrentIterator() throws IOException {
+ fetchBatchesAsync();
synchronized (this) {
- while (!isIterationComplete() && iteratorsQueue.isEmpty()) {
- try {
- this.wait();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- LOG.error("Thread got interrupted: {}", e);
+ if (iteratorsQueue.isEmpty()) {
+ if (ioException != null) {
+ throw ioException;
+ }
+ if (isListingComplete()) {
+ return;
}
}
- if (!iteratorsQueue.isEmpty()) {
- currIterator = iteratorsQueue.poll();
- } else if (ioException != null) {
- throw ioException;
+ }
+ try {
+ currIterator = iteratorsQueue.take();
+ if (!currIterator.hasNext() && !isListingComplete()) {
+ updateCurrentIterator();
}
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.error("Thread got interrupted: {}", e);
}
}
- private void fetchAllAsync() {
- CompletableFuture.supplyAsync(() -> {
- while (!isIterationComplete()) {
- List fileStatuses = new ArrayList<>();
- try {
- continuation = abfsStore
- .listStatus(path, null, fileStatuses, FETCH_ALL_FALSE,
- continuation);
- } catch (IOException e) {
- ioException = e;
- return null;
- } finally {
- if (firstRead) {
- firstRead = false;
- }
- }
- if (fileStatuses != null && !fileStatuses.isEmpty()) {
- iteratorsQueue.add(fileStatuses.listIterator());
- synchronized (this) {
- this.notifyAll();
- }
- }
+ private boolean isListingComplete() {
+ return !firstBatch && (continuation == null || continuation.isEmpty());
+ }
+
+ private void fetchBatchesAsync() {
+ CompletableFuture.runAsync(() -> asyncOp());
+ }
+
+ private void asyncOp() {
+ if (isAsyncInProgress) {
+ return;
+ }
+ synchronized (asyncOpLock) {
+ if (isAsyncInProgress) {
+ return;
}
- return null;
- });
+ isAsyncInProgress = true;
+ }
+ try {
+ while (!isListingComplete() && iteratorsQueue.size() <= MAX_QUEUE_SIZE) {
+ addNextBatchIteratorToQueue();
+ }
+ } catch (IOException e) {
+ ioException = e;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.error("Thread got interrupted: {}", e);
+ } finally {
+ isAsyncInProgress = false;
+ }
}
- private boolean isIterationComplete() {
- return !firstRead && (continuation == null || continuation.isEmpty());
+ private synchronized void addNextBatchIteratorToQueue()
+ throws IOException, InterruptedException {
+ List fileStatuses = new ArrayList<>();
+ continuation = abfsStore
+ .listStatus(path, null, fileStatuses, FETCH_ALL_FALSE, continuation);
+ iteratorsQueue.put(fileStatuses.iterator());
+ if (firstBatch) {
+ firstBatch = false;
+ }
}
}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatusIterator.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatusIterator.java
index 9ba1c1676054a..fb99988ee9a8e 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatusIterator.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatusIterator.java
@@ -18,26 +18,29 @@
package org.apache.hadoop.fs.azurebfs;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.junit.Test;
-import org.mockito.Mockito;
-
import java.io.IOException;
import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
+import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathExists;
-import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import org.apache.hadoop.fs.azurebfs.services.ListStatusRemoteIterator;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
/**
* Test listStatus operation.
@@ -45,7 +48,7 @@
public class ITestAzureBlobFileSystemListStatusIterator
extends AbstractAbfsIntegrationTest {
- private static final int TEST_FILES_NUMBER = 100;
+ private static final int TEST_FILES_NUMBER = 1000;
public ITestAzureBlobFileSystemListStatusIterator() throws Exception {
super();
@@ -71,16 +74,82 @@ public void testListPath() throws Exception {
assertEquals(0, fileNames.size());
}
- @Test(expected = NoSuchElementException.class)
+ @Test
public void testNextWhenNoMoreElementsPresent() throws Exception {
final AzureBlobFileSystem fs = getFileSystem();
String rootPathStr = "testRoot2";
Path rootPath = new Path(rootPathStr);
- getFileSystem().create(rootPath);
+ getFileSystem().mkdirs(rootPath);
RemoteIterator fsItr = fs.listStatusIterator(rootPath);
fsItr = Mockito.spy(fsItr);
Mockito.doReturn(false).when(fsItr).hasNext();
- fsItr.next();
+
+ RemoteIterator finalFsItr = fsItr;
+ Assertions.assertThatThrownBy(() -> finalFsItr.next()).describedAs(
+ "next() should throw NoSuchElementException if hasNext() return "
+ + "false").isInstanceOf(NoSuchElementException.class);
+ }
+
+ @Test
+ public void testHasNextForEmptyDir() throws Exception {
+ final AzureBlobFileSystem fs = getFileSystem();
+ String rootPathStr = "testRoot3";
+ Path rootPath = new Path(rootPathStr);
+ getFileSystem().mkdirs(rootPath);
+ RemoteIterator fsItr = fs.listStatusIterator(rootPath);
+ Assertions.assertThat(fsItr.hasNext())
+ .describedAs("hasNext returns false for empty directory").isFalse();
+ }
+
+ @Test
+ public void testHasNextForFile() throws Exception {
+ final AzureBlobFileSystem fs = getFileSystem();
+ String rootPathStr = "testRoot4";
+ Path rootPath = new Path(rootPathStr);
+ getFileSystem().create(rootPath);
+ RemoteIterator fsItr = fs.listStatusIterator(rootPath);
+ Assertions.assertThat(fsItr.hasNext())
+ .describedAs("hasNext returns true for file").isTrue();
+ }
+
+ @Test
+ public void testHasNextForIOException() throws Exception {
+ final AzureBlobFileSystem fs = getFileSystem();
+ String rootPathStr = "testRoot5";
+ Path rootPath = new Path(rootPathStr);
+ getFileSystem().mkdirs(rootPath);
+ ListStatusRemoteIterator fsItr = (ListStatusRemoteIterator) fs
+ .listStatusIterator(rootPath);
+ Thread.sleep(1000);
+
+ String exceptionMessage = "test exception";
+ setPrivateField(fsItr, ListStatusRemoteIterator.class, "ioException",
+ new IOException(exceptionMessage));
+ setPrivateFinalField(fsItr, ListStatusRemoteIterator.class,
+ "iteratorsQueue", new ArrayBlockingQueue(1));
+
+ Assertions.assertThatThrownBy(() -> fsItr.hasNext()).describedAs(
+ "When ioException is not null and queue is empty exception should be "
+ + "thrown").isInstanceOf(IOException.class)
+ .hasMessage(exceptionMessage);
+ }
+
+ private void setPrivateField(Object obj, Class classObj, String fieldName,
+ Object value) throws NoSuchFieldException, IllegalAccessException {
+ Field field = classObj.getDeclaredField(fieldName);
+ field.setAccessible(true);
+ field.set(obj, value);
+ }
+
+ private void setPrivateFinalField(Object obj, Class classObj,
+ String fieldName, Object value)
+ throws NoSuchFieldException, IllegalAccessException {
+ Field field = classObj.getDeclaredField(fieldName);
+ field.setAccessible(true);
+ Field modifiersField = Field.class.getDeclaredField("modifiers");
+ modifiersField.setAccessible(true);
+ modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
+ field.set(obj, value);
}
private List createFiles(int numFiles, String rootPathStr,
From 83ec324f866e78bbacd4416f13d607e5b901b374 Mon Sep 17 00:00:00 2001
From: Bilahari T H
Date: Wed, 13 Jan 2021 15:06:55 +0530
Subject: [PATCH 05/17] Addressing review comments
---
.../hadoop/fs/azurebfs/AbfsConfiguration.java | 13 +
.../fs/azurebfs/AzureBlobFileSystem.java | 15 +-
.../fs/azurebfs/AzureBlobFileSystemStore.java | 8 +-
.../azurebfs/constants/ConfigurationKeys.java | 2 +
.../constants/FileSystemConfigurations.java | 2 +
...java => AbfsListStatusRemoteIterator.java} | 31 +-
.../fs/azurebfs/services/ListingSupport.java | 76 ++++
.../azurebfs/ITestAbfsListStatusIterator.java | 328 ++++++++++++++++++
...AzureBlobFileSystemListStatusIterator.java | 190 ----------
9 files changed, 454 insertions(+), 211 deletions(-)
rename hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/{ListStatusRemoteIterator.java => AbfsListStatusRemoteIterator.java} (87%)
create mode 100644 hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListingSupport.java
create mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsListStatusIterator.java
delete mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatusIterator.java
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
index 5a70323395334..e9cb6ba5dc3b0 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
@@ -275,6 +275,10 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS)
private long sasTokenRenewPeriodForStreamsInSeconds;
+ @BooleanConfigurationValidatorAnnotation(ConfigurationKey =
+ FS_AZURE_ENABLE_FLUSH, DefaultValue = DEFAULT_ENABLE_ABFS_LIST_ITERATOR)
+ private boolean enableAbfsListIterator;
+
public AbfsConfiguration(final Configuration rawConfig, String accountName)
throws IllegalAccessException, InvalidConfigurationValueException, IOException {
this.rawConfig = ProviderUtils.excludeIncompatibleCredentialProviders(
@@ -896,6 +900,10 @@ public int getMaxWriteRequestsToQueue() {
return this.maxWriteRequestsToQueue;
}
+ public boolean enableAbfsListIterator() {
+ return this.enableAbfsListIterator;
+ }
+
@VisibleForTesting
void setReadBufferSize(int bufferSize) {
this.readBufferSize = bufferSize;
@@ -961,4 +969,9 @@ public void setOptimizeFooterRead(boolean optimizeFooterRead) {
this.optimizeFooterRead = optimizeFooterRead;
}
+ @VisibleForTesting
+ public void setEnableAbfsListIterator(boolean enableAbfsListIterator) {
+ this.enableAbfsListIterator = enableAbfsListIterator;
+ }
+
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
index d391ac947aab1..f6f8186053659 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
@@ -37,16 +37,17 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.fs.azurebfs.services.ListStatusRemoteIterator;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hadoop.util.functional.RemoteIterators;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsClientThrottlingIntercept;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.azurebfs.services.AbfsListStatusRemoteIterator;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
@@ -988,8 +989,14 @@ public boolean exists(Path f) throws IOException {
public RemoteIterator listStatusIterator(Path path)
throws IOException {
LOG.debug("AzureBlobFileSystem.listStatusIterator path : {}", path);
- Path qualifiedPath = makeQualified(path);
- return new ListStatusRemoteIterator(qualifiedPath, abfsStore);
+ if (abfsStore.getAbfsConfiguration().enableAbfsListIterator()) {
+ Path qualifiedPath = makeQualified(path);
+ AbfsListStatusRemoteIterator abfsLsItr =
+ new AbfsListStatusRemoteIterator(qualifiedPath, abfsStore);
+ return RemoteIterators.typeCastingRemoteIterator(abfsLsItr);
+ } else {
+ return super.listStatusIterator(path);
+ }
}
private FileStatus tryGetFileStatus(final Path f) {
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
index 3ce746eea8178..f4be159bf9976 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
@@ -102,6 +102,7 @@
import org.apache.hadoop.fs.azurebfs.services.SharedKeyCredentials;
import org.apache.hadoop.fs.azurebfs.services.AbfsPerfTracker;
import org.apache.hadoop.fs.azurebfs.services.AbfsPerfInfo;
+import org.apache.hadoop.fs.azurebfs.services.ListingSupport;
import org.apache.hadoop.fs.azurebfs.utils.Base64;
import org.apache.hadoop.fs.azurebfs.utils.CRC64;
import org.apache.hadoop.fs.azurebfs.utils.DateTimeUtils;
@@ -131,7 +132,7 @@
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
-public class AzureBlobFileSystemStore implements Closeable {
+public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
private static final Logger LOG = LoggerFactory.getLogger(AzureBlobFileSystemStore.class);
private AbfsClient client;
@@ -838,6 +839,7 @@ public FileStatus getFileStatus(final Path path) throws IOException {
* @param path The list path.
* @return the entries in the path.
* */
+ @Override
public FileStatus[] listStatus(final Path path) throws IOException {
return listStatus(path, null);
}
@@ -854,12 +856,14 @@ public FileStatus[] listStatus(final Path path) throws IOException {
* @return the entries in the path start from "startFrom" in lexical order.
* */
@InterfaceStability.Unstable
+ @Override
public FileStatus[] listStatus(final Path path, final String startFrom) throws IOException {
List fileStatuses = new ArrayList<>();
listStatus(path, startFrom, fileStatuses, true, null);
return fileStatuses.toArray(new FileStatus[fileStatuses.size()]);
}
+ @Override
public String listStatus(final Path path, final String startFrom,
List fileStatuses, final boolean fetchAll,
String continuation) throws IOException {
@@ -874,7 +878,7 @@ public String listStatus(final Path path, final String startFrom,
final String relativePath = getRelativePath(path);
- if (continuation == null || continuation.length() < 1) {
+ if (continuation == null || continuation.isEmpty()) {
// generate continuation token if a valid startFrom is provided.
if (startFrom != null && !startFrom.isEmpty()) {
continuation = getIsNamespaceEnabled()
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
index cdef9c9b7ac07..8a9c63ddbe895 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
@@ -130,6 +130,8 @@ public final class ConfigurationKeys {
public static final String FS_AZURE_SKIP_SUPER_USER_REPLACEMENT = "fs.azure.identity.transformer.skip.superuser.replacement";
public static final String AZURE_KEY_ACCOUNT_KEYPROVIDER = "fs.azure.account.keyprovider";
public static final String AZURE_KEY_ACCOUNT_SHELLKEYPROVIDER_SCRIPT = "fs.azure.shellkeyprovider.script";
+ /** Setting this true will make the driver use it's own RemoteIterator implementation */
+ public static final String FS_AZURE_ENABLE_ABFS_LIST_ITERATOR = "fs.azure.enable.abfslistiterator";
/** End point of ABFS account: {@value}. */
public static final String AZURE_ABFS_ENDPOINT = "fs.azure.abfs.endpoint";
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
index a23dfd5292bb8..9b760c472a9ad 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
@@ -101,5 +101,7 @@ public final class FileSystemConfigurations {
public static final boolean DEFAULT_DELETE_CONSIDERED_IDEMPOTENT = true;
public static final int DEFAULT_CLOCK_SKEW_WITH_SERVER_IN_MS = 5 * 60 * 1000; // 5 mins
+ public static final boolean DEFAULT_ENABLE_ABFS_LIST_ITERATOR = true;
+
private FileSystemConfigurations() {}
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListStatusRemoteIterator.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListStatusRemoteIterator.java
similarity index 87%
rename from hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListStatusRemoteIterator.java
rename to hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListStatusRemoteIterator.java
index 919d82352c826..be25288617b80 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListStatusRemoteIterator.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListStatusRemoteIterator.java
@@ -33,31 +33,30 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore;
-public class ListStatusRemoteIterator implements RemoteIterator {
+public class AbfsListStatusRemoteIterator implements RemoteIterator {
private static final Logger LOG = LoggerFactory
- .getLogger(ListStatusRemoteIterator.class);
+ .getLogger(AbfsListStatusRemoteIterator.class);
private static final boolean FETCH_ALL_FALSE = false;
private static final int MAX_QUEUE_SIZE = 10;
private final Path path;
- private final AzureBlobFileSystemStore abfsStore;
+ private final ListingSupport listingSupport;
private final ArrayBlockingQueue> iteratorsQueue;
private final Object asyncOpLock = new Object();
+ private volatile boolean isAsyncInProgress = false;
private boolean firstBatch = true;
- private boolean isAsyncInProgress = false;
private String continuation;
private Iterator currIterator;
private IOException ioException;
- public ListStatusRemoteIterator(final Path path,
- final AzureBlobFileSystemStore abfsStore) {
+ public AbfsListStatusRemoteIterator(final Path path,
+ final ListingSupport listingSupport) {
this.path = path;
- this.abfsStore = abfsStore;
+ this.listingSupport = listingSupport;
iteratorsQueue = new ArrayBlockingQueue<>(MAX_QUEUE_SIZE);
currIterator = Collections.emptyIterator();
fetchBatchesAsync();
@@ -103,15 +102,11 @@ private void updateCurrentIterator() throws IOException {
}
}
- private boolean isListingComplete() {
+ private synchronized boolean isListingComplete() {
return !firstBatch && (continuation == null || continuation.isEmpty());
}
private void fetchBatchesAsync() {
- CompletableFuture.runAsync(() -> asyncOp());
- }
-
- private void asyncOp() {
if (isAsyncInProgress) {
return;
}
@@ -121,6 +116,10 @@ private void asyncOp() {
}
isAsyncInProgress = true;
}
+ CompletableFuture.runAsync(() -> asyncOp());
+ }
+
+ private void asyncOp() {
try {
while (!isListingComplete() && iteratorsQueue.size() <= MAX_QUEUE_SIZE) {
addNextBatchIteratorToQueue();
@@ -131,14 +130,16 @@ private void asyncOp() {
Thread.currentThread().interrupt();
LOG.error("Thread got interrupted: {}", e);
} finally {
- isAsyncInProgress = false;
+ synchronized (asyncOpLock) {
+ isAsyncInProgress = false;
+ }
}
}
private synchronized void addNextBatchIteratorToQueue()
throws IOException, InterruptedException {
List fileStatuses = new ArrayList<>();
- continuation = abfsStore
+ continuation = listingSupport
.listStatus(path, null, fileStatuses, FETCH_ALL_FALSE, continuation);
iteratorsQueue.put(fileStatuses.iterator());
if (firstBatch) {
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListingSupport.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListingSupport.java
new file mode 100644
index 0000000000000..08378d68bc8e0
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListingSupport.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+
+public interface ListingSupport {
+
+ /**
+ * @param path The list path.
+ * @return the entries in the path.
+ */
+ FileStatus[] listStatus(final Path path) throws IOException;
+
+ /**
+ * @param path Path the list path.
+ * @param startFrom The entry name that list results should start with.
+ * For example, if folder "/folder" contains four
+ * files: "afile", "bfile", "hfile", "ifile". Then
+ * listStatus(Path("/folder"), "hfile") will return
+ * "/folder/hfile" and "folder/ifile" Notice that if
+ * startFrom is a non-existent entry name, then the
+ * list response contains all entries after this
+ * non-existent entry in lexical order: listStatus
+ * (Path("/folder"), "cfile") will return
+ * "/folder/hfile" and "/folder/ifile".
+ * @return the entries in the path start from "startFrom" in lexical order.
+ */
+ @InterfaceStability.Unstable
+ FileStatus[] listStatus(final Path path, final String startFrom)
+ throws IOException;
+
+ /**
+ * @param path The list path
+ * @param startFrom The entry name that list results should start with.
+ * For example, if folder "/folder" contains four
+ * files: "afile", "bfile", "hfile", "ifile". Then
+ * listStatus(Path("/folder"), "hfile") will return
+ * "/folder/hfile" and "folder/ifile" Notice that if
+ * startFrom is a non-existent entry name, then the
+ * list response contains all entries after this
+ * non-existent entry in lexical order: listStatus
+ * (Path("/folder"), "cfile") will return
+ * "/folder/hfile" and "/folder/ifile".
+ * @param fileStatuses This list has to be filled with the FileStatus objects
+ * @param fetchAll flag to indicate if the above list needs to be
+ * filled with just one page os results or the entire
+ * result.
+ * @param continuation Contiuation token. null means start rom the begining.
+ * @return Continuation tokem
+ * @throws IOException
+ */
+ String listStatus(Path path, String startFrom, List fileStatuses,
+ boolean fetchAll, String continuation) throws IOException;
+}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsListStatusIterator.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsListStatusIterator.java
new file mode 100644
index 0000000000000..7fdb80ce83d98
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsListStatusIterator.java
@@ -0,0 +1,328 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.azurebfs.services.AbfsListStatusRemoteIterator;
+import org.apache.hadoop.fs.azurebfs.services.ListingSupport;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.nullable;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Test ListStatusRemoteIterator operation.
+ */
+public class ITestAbfsListStatusIterator extends AbstractAbfsIntegrationTest {
+
+ private static final int TEST_FILES_NUMBER = 1000;
+
+ public ITestAbfsListStatusIterator() throws Exception {
+ super();
+ }
+
+ @Test
+ public void testListStatusRemoteIterator() throws Exception {
+ Path testDir = createTestDirectory();
+ setPageSize(10);
+ final List fileNames = createFilesUnderDirectory(TEST_FILES_NUMBER,
+ testDir, "testListPath");
+
+ ListingSupport listngSupport = Mockito.spy(getFileSystem().getAbfsStore());
+ RemoteIterator fsItr = new AbfsListStatusRemoteIterator(
+ getFileSystem().makeQualified(testDir), listngSupport);
+ Assertions.assertThat(fsItr)
+ .describedAs("RemoteIterator should be instance of "
+ + "AbfsListStatusRemoteIterator by default")
+ .isInstanceOf(AbfsListStatusRemoteIterator.class);
+ int itrCount = 0;
+ while (fsItr.hasNext()) {
+ FileStatus fileStatus = fsItr.next();
+ String pathStr = fileStatus.getPath().toString();
+ fileNames.remove(pathStr);
+ itrCount++;
+ }
+ Assertions.assertThat(itrCount)
+ .describedAs("Number of iterations should be equal to the files "
+ + "created")
+ .isEqualTo(TEST_FILES_NUMBER);
+ Assertions.assertThat(fileNames.size())
+ .describedAs("After removing every iterm found from the iterator, "
+ + "there should be no more elements in the fileNames")
+ .isEqualTo(0);
+ verify(listngSupport, Mockito.atLeast(100))
+ .listStatus(any(Path.class), nullable(String.class),
+ anyList(), anyBoolean(),
+ nullable(String.class));
+ }
+
+ @Test
+ public void testListStatusRemoteIteratorWithoutHasNext() throws Exception {
+ Path testDir = createTestDirectory();
+ setPageSize(10);
+ final List fileNames = createFilesUnderDirectory(TEST_FILES_NUMBER,
+ testDir, "testListPath");
+
+ ListingSupport listngSupport = Mockito.spy(getFileSystem().getAbfsStore());
+ RemoteIterator fsItr = new AbfsListStatusRemoteIterator(
+ getFileSystem().makeQualified(testDir), listngSupport);
+ Assertions.assertThat(fsItr)
+ .describedAs("RemoteIterator should be instance of "
+ + "AbfsListStatusRemoteIterator by default")
+ .isInstanceOf(AbfsListStatusRemoteIterator.class);
+ int itrCount = 0;
+ for (int i = 0; i < TEST_FILES_NUMBER; i++) {
+ FileStatus fileStatus = fsItr.next();
+ String pathStr = fileStatus.getPath().toString();
+ fileNames.remove(pathStr);
+ itrCount++;
+ }
+ Assertions.assertThatThrownBy(() -> fsItr.next())
+ .describedAs(
+ "next() should throw NoSuchElementException since next has been "
+ + "called " + TEST_FILES_NUMBER + " times")
+ .isInstanceOf(NoSuchElementException.class);
+ Assertions.assertThat(itrCount)
+ .describedAs("Number of iterations should be equal to the files "
+ + "created")
+ .isEqualTo(TEST_FILES_NUMBER);
+ Assertions.assertThat(fileNames.size())
+ .describedAs("After removing every iterm found from the iterator, "
+ + "there should be no more elements in the fileNames")
+ .isEqualTo(0);
+ verify(listngSupport, Mockito.atLeast(100))
+ .listStatus(any(Path.class), nullable(String.class),
+ anyList(), anyBoolean(),
+ nullable(String.class));
+ }
+
+ @Test
+ public void testWithAbfsIteratorDisabled() throws Exception {
+ Path testDir = createTestDirectory();
+ setPageSize(10);
+ setEnableAbfsIterator(false);
+ final List fileNames = createFilesUnderDirectory(TEST_FILES_NUMBER,
+ testDir, "testListPath");
+
+ RemoteIterator fsItr =
+ getFileSystem().listStatusIterator(testDir);
+ Assertions.assertThat(fsItr)
+ .describedAs("RemoteIterator should not be instance of "
+ + "AbfsListStatusRemoteIterator when it is disabled")
+ .isNotInstanceOf(AbfsListStatusRemoteIterator.class);
+ int itrCount = 0;
+ while (fsItr.hasNext()) {
+ FileStatus fileStatus = fsItr.next();
+ String pathStr = fileStatus.getPath().toString();
+ fileNames.remove(pathStr);
+ itrCount++;
+ }
+ Assertions.assertThat(itrCount)
+ .describedAs("Number of iterations should be equal to the files "
+ + "created")
+ .isEqualTo(TEST_FILES_NUMBER);
+ Assertions.assertThat(fileNames.size())
+ .describedAs("After removing every iterm found from the iterator, "
+ + "there should be no more elements in the fileNames")
+ .isEqualTo(0);
+ }
+
+ @Test
+ public void testWithAbfsIteratorDisabledWithutHasNext() throws Exception {
+ Path testDir = createTestDirectory();
+ setPageSize(10);
+ setEnableAbfsIterator(false);
+ final List fileNames = createFilesUnderDirectory(TEST_FILES_NUMBER,
+ testDir, "testListPath");
+
+ RemoteIterator fsItr =
+ getFileSystem().listStatusIterator(testDir);
+ Assertions.assertThat(fsItr)
+ .describedAs("RemoteIterator should not be instance of "
+ + "AbfsListStatusRemoteIterator when it is disabled")
+ .isNotInstanceOf(AbfsListStatusRemoteIterator.class);
+ int itrCount = 0;
+ for (int i = 0; i < TEST_FILES_NUMBER; i++) {
+ FileStatus fileStatus = fsItr.next();
+ String pathStr = fileStatus.getPath().toString();
+ fileNames.remove(pathStr);
+ itrCount++;
+ }
+ Assertions.assertThatThrownBy(() -> fsItr.next())
+ .describedAs(
+ "next() should throw NoSuchElementException since next has been "
+ + "called " + TEST_FILES_NUMBER + " times")
+ .isInstanceOf(NoSuchElementException.class);
+ Assertions.assertThat(itrCount)
+ .describedAs("Number of iterations should be equal to the files "
+ + "created")
+ .isEqualTo(TEST_FILES_NUMBER);
+ Assertions.assertThat(fileNames.size())
+ .describedAs("After removing every iterm found from the iterator, "
+ + "there should be no more elements in the fileNames")
+ .isEqualTo(0);
+ }
+
+ @Test
+ public void testNextWhenNoMoreElementsPresent() throws Exception {
+ Path testDir = createTestDirectory();
+ setPageSize(10);
+ RemoteIterator fsItr =
+ new AbfsListStatusRemoteIterator(getFileSystem().makeQualified(testDir),
+ getFileSystem().getAbfsStore());
+ fsItr = Mockito.spy(fsItr);
+ Mockito.doReturn(false).when(fsItr).hasNext();
+
+ RemoteIterator finalFsItr = fsItr;
+ Assertions.assertThatThrownBy(() -> finalFsItr.next())
+ .describedAs(
+ "next() should throw NoSuchElementException if hasNext() return "
+ + "false")
+ .isInstanceOf(NoSuchElementException.class);
+ }
+
+ @Test
+ public void testHasNextForEmptyDir() throws Exception {
+ Path testDir = createTestDirectory();
+ setPageSize(10);
+ RemoteIterator fsItr = getFileSystem()
+ .listStatusIterator(testDir);
+ Assertions.assertThat(fsItr.hasNext())
+ .describedAs("hasNext returns false for empty directory")
+ .isFalse();
+ }
+
+ @Test
+ public void testHasNextForFile() throws Exception {
+ final AzureBlobFileSystem fs = getFileSystem();
+ String testFileName = "testFile";
+ Path testFile = new Path(testFileName);
+ getFileSystem().create(testFile);
+ setPageSize(10);
+ RemoteIterator fsItr = fs.listStatusIterator(testFile);
+ Assertions.assertThat(fsItr.hasNext())
+ .describedAs("hasNext returns true for file").isTrue();
+ Assertions.assertThat(fsItr.next().getPath().toString())
+ .describedAs("next returns the file itself")
+ .endsWith(testFileName);
+ }
+
+ @Test
+ public void testIOException() throws Exception {
+ Path testDir = createTestDirectory();
+ setPageSize(10);
+ getFileSystem().mkdirs(testDir);
+
+ String exceptionMessage = "test exception";
+ ListingSupport lsSupport =getMockListingSupport(exceptionMessage);
+ RemoteIterator fsItr =
+ new AbfsListStatusRemoteIterator(getFileSystem().makeQualified(testDir),
+ lsSupport);
+
+ Assertions.assertThatThrownBy(() -> fsItr.next())
+ .describedAs(
+ "When ioException is not null and queue is empty exception should be "
+ + "thrown")
+ .isInstanceOf(IOException.class)
+ .hasMessage(exceptionMessage);
+ }
+
+ private ListingSupport getMockListingSupport(String exceptionMessage) {
+ return new ListingSupport() {
+ @Override
+ public FileStatus[] listStatus(Path path) throws IOException {
+ return null;
+ }
+
+ @Override
+ public FileStatus[] listStatus(Path path, String startFrom)
+ throws IOException {
+ return null;
+ }
+
+ @Override
+ public String listStatus(Path path, String startFrom,
+ List fileStatuses, boolean fetchAll, String continuation)
+ throws IOException {
+ throw new IOException(exceptionMessage);
+ }
+ };
+ }
+
+ private Path createTestDirectory() throws IOException {
+ String testDirectoryName = "testDirectory" + System.currentTimeMillis();
+ Path testDirectory = new Path(testDirectoryName);
+ getFileSystem().mkdirs(testDirectory);
+ return testDirectory;
+ }
+
+ private void setEnableAbfsIterator(boolean shouldEnable) throws IOException {
+ AzureBlobFileSystemStore abfsStore = getAbfsStore(getFileSystem());
+ abfsStore.getAbfsConfiguration().setEnableAbfsListIterator(shouldEnable);
+ }
+
+ private void setPageSize(int pageSize) throws IOException {
+ AzureBlobFileSystemStore abfsStore = getAbfsStore(getFileSystem());
+ abfsStore.getAbfsConfiguration().setListMaxResults(pageSize);
+ }
+
+ private List createFilesUnderDirectory(int numFiles, Path rootPath,
+ String filenamePrefix)
+ throws ExecutionException, InterruptedException, IOException {
+ final List> tasks = new ArrayList<>();
+ final List fileNames = new ArrayList<>();
+ ExecutorService es = Executors.newFixedThreadPool(10);
+ for (int i = 0; i < numFiles; i++) {
+ final Path filePath = new Path(rootPath, filenamePrefix + i);
+ Callable callable = new Callable() {
+ @Override
+ public Void call() throws Exception {
+ getFileSystem().create(filePath);
+ fileNames.add(makeQualified(filePath).toString());
+ return null;
+ }
+ };
+ tasks.add(es.submit(callable));
+ }
+ for (Future task : tasks) {
+ task.get();
+ }
+ es.shutdownNow();
+ return fileNames;
+ }
+
+}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatusIterator.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatusIterator.java
deleted file mode 100644
index fb99988ee9a8e..0000000000000
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatusIterator.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.azurebfs;
-
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import org.assertj.core.api.Assertions;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import org.apache.hadoop.fs.azurebfs.services.ListStatusRemoteIterator;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
-
-/**
- * Test listStatus operation.
- */
-public class ITestAzureBlobFileSystemListStatusIterator
- extends AbstractAbfsIntegrationTest {
-
- private static final int TEST_FILES_NUMBER = 1000;
-
- public ITestAzureBlobFileSystemListStatusIterator() throws Exception {
- super();
- }
-
- @Test
- public void testListPath() throws Exception {
- final AzureBlobFileSystem fs = getFileSystem();
- String rootPath = "testRoot1";
- final List fileNames = createFiles(TEST_FILES_NUMBER, rootPath,
- "testListPath");
- AzureBlobFileSystemStore abfsStore = getAbfsStore(fs);
- abfsStore.getAbfsConfiguration().setListMaxResults(10);
- RemoteIterator fsIt = fs.listStatusIterator(new Path(rootPath));
- int itrCount = 0;
- while (fsIt.hasNext()) {
- FileStatus fileStatus = fsIt.next();
- String pathStr = fileStatus.getPath().toString();
- fileNames.remove(pathStr);
- itrCount++;
- }
- assertEquals(TEST_FILES_NUMBER, itrCount);
- assertEquals(0, fileNames.size());
- }
-
- @Test
- public void testNextWhenNoMoreElementsPresent() throws Exception {
- final AzureBlobFileSystem fs = getFileSystem();
- String rootPathStr = "testRoot2";
- Path rootPath = new Path(rootPathStr);
- getFileSystem().mkdirs(rootPath);
- RemoteIterator fsItr = fs.listStatusIterator(rootPath);
- fsItr = Mockito.spy(fsItr);
- Mockito.doReturn(false).when(fsItr).hasNext();
-
- RemoteIterator finalFsItr = fsItr;
- Assertions.assertThatThrownBy(() -> finalFsItr.next()).describedAs(
- "next() should throw NoSuchElementException if hasNext() return "
- + "false").isInstanceOf(NoSuchElementException.class);
- }
-
- @Test
- public void testHasNextForEmptyDir() throws Exception {
- final AzureBlobFileSystem fs = getFileSystem();
- String rootPathStr = "testRoot3";
- Path rootPath = new Path(rootPathStr);
- getFileSystem().mkdirs(rootPath);
- RemoteIterator fsItr = fs.listStatusIterator(rootPath);
- Assertions.assertThat(fsItr.hasNext())
- .describedAs("hasNext returns false for empty directory").isFalse();
- }
-
- @Test
- public void testHasNextForFile() throws Exception {
- final AzureBlobFileSystem fs = getFileSystem();
- String rootPathStr = "testRoot4";
- Path rootPath = new Path(rootPathStr);
- getFileSystem().create(rootPath);
- RemoteIterator fsItr = fs.listStatusIterator(rootPath);
- Assertions.assertThat(fsItr.hasNext())
- .describedAs("hasNext returns true for file").isTrue();
- }
-
- @Test
- public void testHasNextForIOException() throws Exception {
- final AzureBlobFileSystem fs = getFileSystem();
- String rootPathStr = "testRoot5";
- Path rootPath = new Path(rootPathStr);
- getFileSystem().mkdirs(rootPath);
- ListStatusRemoteIterator fsItr = (ListStatusRemoteIterator) fs
- .listStatusIterator(rootPath);
- Thread.sleep(1000);
-
- String exceptionMessage = "test exception";
- setPrivateField(fsItr, ListStatusRemoteIterator.class, "ioException",
- new IOException(exceptionMessage));
- setPrivateFinalField(fsItr, ListStatusRemoteIterator.class,
- "iteratorsQueue", new ArrayBlockingQueue(1));
-
- Assertions.assertThatThrownBy(() -> fsItr.hasNext()).describedAs(
- "When ioException is not null and queue is empty exception should be "
- + "thrown").isInstanceOf(IOException.class)
- .hasMessage(exceptionMessage);
- }
-
- private void setPrivateField(Object obj, Class classObj, String fieldName,
- Object value) throws NoSuchFieldException, IllegalAccessException {
- Field field = classObj.getDeclaredField(fieldName);
- field.setAccessible(true);
- field.set(obj, value);
- }
-
- private void setPrivateFinalField(Object obj, Class classObj,
- String fieldName, Object value)
- throws NoSuchFieldException, IllegalAccessException {
- Field field = classObj.getDeclaredField(fieldName);
- field.setAccessible(true);
- Field modifiersField = Field.class.getDeclaredField("modifiers");
- modifiersField.setAccessible(true);
- modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
- field.set(obj, value);
- }
-
- private List createFiles(int numFiles, String rootPathStr,
- String filenamePrefix)
- throws ExecutionException, InterruptedException, IOException {
- final List> tasks = new ArrayList<>();
- final List fileNames = new ArrayList<>();
- ExecutorService es = Executors.newFixedThreadPool(10);
- final Path rootPath = new Path(rootPathStr);
- for (int i = 0; i < numFiles; i++) {
- final Path filePath = new Path(rootPath, filenamePrefix + i);
- Callable callable = new Callable() {
- @Override
- public Void call() throws Exception {
- getFileSystem().create(filePath);
- fileNames.add(makeQualified(filePath).toString());
- return null;
- }
- };
- tasks.add(es.submit(callable));
- }
- for (Future task : tasks) {
- task.get();
- }
- es.shutdownNow();
- return fileNames;
- }
-
- private AzureBlobFileSystemStore getAbfsStore(FileSystem fs)
- throws NoSuchFieldException, IllegalAccessException {
- AzureBlobFileSystem abfs = (AzureBlobFileSystem) fs;
- Field abfsStoreField = AzureBlobFileSystem.class
- .getDeclaredField("abfsStore");
- abfsStoreField.setAccessible(true);
- return (AzureBlobFileSystemStore) abfsStoreField.get(abfs);
- }
-
-}
From a6e54076877b4ee4d3e5d2acacacbd10e70bee1b Mon Sep 17 00:00:00 2001
From: Bilahari T H
Date: Fri, 15 Jan 2021 11:07:37 +0530
Subject: [PATCH 06/17] Putting empty iterator in finally block. This is to
prevent the take from hanging in case the first call itself result in
exception
---
.../fs/azurebfs/services/AbfsListStatusRemoteIterator.java | 6 ++++++
1 file changed, 6 insertions(+)
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListStatusRemoteIterator.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListStatusRemoteIterator.java
index be25288617b80..1b62fa4d403c1 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListStatusRemoteIterator.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListStatusRemoteIterator.java
@@ -131,6 +131,12 @@ private void asyncOp() {
LOG.error("Thread got interrupted: {}", e);
} finally {
synchronized (asyncOpLock) {
+ try {
+ iteratorsQueue.put(Collections.emptyIterator());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.error("Thread got interrupted: {}", e);
+ }
isAsyncInProgress = false;
}
}
From b9ff82c25d3000b4f531bd0b397b7610919ab31c Mon Sep 17 00:00:00 2001
From: Bilahari T H
Date: Fri, 15 Jan 2021 15:42:18 +0530
Subject: [PATCH 07/17] Throwing FileNotFoundException when the directory does
not exist
---
.../fs/azurebfs/AzureBlobFileSystem.java | 3 +--
.../AbfsListStatusRemoteIterator.java | 9 +++++----
.../azurebfs/ITestAbfsListStatusIterator.java | 20 +++++++++++++++----
3 files changed, 22 insertions(+), 10 deletions(-)
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
index f6f8186053659..25d8d0b4178db 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
@@ -990,9 +990,8 @@ public RemoteIterator listStatusIterator(Path path)
throws IOException {
LOG.debug("AzureBlobFileSystem.listStatusIterator path : {}", path);
if (abfsStore.getAbfsConfiguration().enableAbfsListIterator()) {
- Path qualifiedPath = makeQualified(path);
AbfsListStatusRemoteIterator abfsLsItr =
- new AbfsListStatusRemoteIterator(qualifiedPath, abfsStore);
+ new AbfsListStatusRemoteIterator(getFileStatus(path), abfsStore);
return RemoteIterators.typeCastingRemoteIterator(abfsLsItr);
} else {
return super.listStatusIterator(path);
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListStatusRemoteIterator.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListStatusRemoteIterator.java
index 1b62fa4d403c1..5b06019e03715 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListStatusRemoteIterator.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListStatusRemoteIterator.java
@@ -42,7 +42,7 @@ public class AbfsListStatusRemoteIterator implements RemoteIterator
private static final boolean FETCH_ALL_FALSE = false;
private static final int MAX_QUEUE_SIZE = 10;
- private final Path path;
+ private final FileStatus fileStatus;
private final ListingSupport listingSupport;
private final ArrayBlockingQueue> iteratorsQueue;
private final Object asyncOpLock = new Object();
@@ -53,9 +53,9 @@ public class AbfsListStatusRemoteIterator implements RemoteIterator
private Iterator currIterator;
private IOException ioException;
- public AbfsListStatusRemoteIterator(final Path path,
+ public AbfsListStatusRemoteIterator(final FileStatus fileStatus,
final ListingSupport listingSupport) {
- this.path = path;
+ this.fileStatus = fileStatus;
this.listingSupport = listingSupport;
iteratorsQueue = new ArrayBlockingQueue<>(MAX_QUEUE_SIZE);
currIterator = Collections.emptyIterator();
@@ -146,7 +146,8 @@ private synchronized void addNextBatchIteratorToQueue()
throws IOException, InterruptedException {
List fileStatuses = new ArrayList<>();
continuation = listingSupport
- .listStatus(path, null, fileStatuses, FETCH_ALL_FALSE, continuation);
+ .listStatus(fileStatus.getPath(), null, fileStatuses, FETCH_ALL_FALSE,
+ continuation);
iteratorsQueue.put(fileStatuses.iterator());
if (firstBatch) {
firstBatch = false;
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsListStatusIterator.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsListStatusIterator.java
index 7fdb80ce83d98..361a6673dd948 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsListStatusIterator.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsListStatusIterator.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.fs.azurebfs;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -28,6 +29,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import org.apache.hadoop.test.LambdaTestUtils;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.mockito.Mockito;
@@ -64,7 +66,7 @@ public void testListStatusRemoteIterator() throws Exception {
ListingSupport listngSupport = Mockito.spy(getFileSystem().getAbfsStore());
RemoteIterator fsItr = new AbfsListStatusRemoteIterator(
- getFileSystem().makeQualified(testDir), listngSupport);
+ getFileSystem().getFileStatus(testDir), listngSupport);
Assertions.assertThat(fsItr)
.describedAs("RemoteIterator should be instance of "
+ "AbfsListStatusRemoteIterator by default")
@@ -99,7 +101,7 @@ public void testListStatusRemoteIteratorWithoutHasNext() throws Exception {
ListingSupport listngSupport = Mockito.spy(getFileSystem().getAbfsStore());
RemoteIterator fsItr = new AbfsListStatusRemoteIterator(
- getFileSystem().makeQualified(testDir), listngSupport);
+ getFileSystem().getFileStatus(testDir), listngSupport);
Assertions.assertThat(fsItr)
.describedAs("RemoteIterator should be instance of "
+ "AbfsListStatusRemoteIterator by default")
@@ -202,7 +204,7 @@ public void testNextWhenNoMoreElementsPresent() throws Exception {
Path testDir = createTestDirectory();
setPageSize(10);
RemoteIterator fsItr =
- new AbfsListStatusRemoteIterator(getFileSystem().makeQualified(testDir),
+ new AbfsListStatusRemoteIterator(getFileSystem().getFileStatus(testDir),
getFileSystem().getAbfsStore());
fsItr = Mockito.spy(fsItr);
Mockito.doReturn(false).when(fsItr).hasNext();
@@ -250,7 +252,7 @@ public void testIOException() throws Exception {
String exceptionMessage = "test exception";
ListingSupport lsSupport =getMockListingSupport(exceptionMessage);
RemoteIterator fsItr =
- new AbfsListStatusRemoteIterator(getFileSystem().makeQualified(testDir),
+ new AbfsListStatusRemoteIterator(getFileSystem().getFileStatus(testDir),
lsSupport);
Assertions.assertThatThrownBy(() -> fsItr.next())
@@ -261,6 +263,16 @@ public void testIOException() throws Exception {
.hasMessage(exceptionMessage);
}
+ @Test
+ public void testNonExistingPath() throws Throwable {
+ Path nonExistingDir = new Path("nonExistingPath");
+ Assertions.assertThatThrownBy(
+ () -> getFileSystem().listStatusIterator(nonExistingDir)).describedAs(
+ "test the listStatusIterator call on a path which is not "
+ + "present should result in FileNotFoundException")
+ .isInstanceOf(FileNotFoundException.class);
+ }
+
private ListingSupport getMockListingSupport(String exceptionMessage) {
return new ListingSupport() {
@Override
From f2b3ad42d2a79f33f49135a5c44ab29a41f9caa1 Mon Sep 17 00:00:00 2001
From: Bilahari T H
Date: Fri, 15 Jan 2021 16:38:20 +0530
Subject: [PATCH 08/17] Making the put on finally non blocking
---
.../fs/azurebfs/services/AbfsListStatusRemoteIterator.java | 7 +------
1 file changed, 1 insertion(+), 6 deletions(-)
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListStatusRemoteIterator.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListStatusRemoteIterator.java
index 5b06019e03715..462508309fc15 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListStatusRemoteIterator.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListStatusRemoteIterator.java
@@ -131,13 +131,8 @@ private void asyncOp() {
LOG.error("Thread got interrupted: {}", e);
} finally {
synchronized (asyncOpLock) {
- try {
- iteratorsQueue.put(Collections.emptyIterator());
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- LOG.error("Thread got interrupted: {}", e);
- }
isAsyncInProgress = false;
+ iteratorsQueue.offer(Collections.emptyIterator());
}
}
}
From d3cac7771de5e53ab1c761d55b9dc1d9a295392f Mon Sep 17 00:00:00 2001
From: Bilahari T H
Date: Mon, 18 Jan 2021 11:01:56 +0530
Subject: [PATCH 09/17] Adding empty iterator from the catch block
---
.../org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java | 2 +-
.../fs/azurebfs/services/AbfsListStatusRemoteIterator.java | 3 +--
.../apache/hadoop/fs/azurebfs/ITestAbfsListStatusIterator.java | 1 -
3 files changed, 2 insertions(+), 4 deletions(-)
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
index 25d8d0b4178db..63ef7cbd6f084 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
@@ -39,7 +39,6 @@
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
-import org.apache.hadoop.util.functional.RemoteIterators;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -81,6 +80,7 @@
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.functional.RemoteIterators;
import org.apache.hadoop.util.Progressable;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.*;
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListStatusRemoteIterator.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListStatusRemoteIterator.java
index 462508309fc15..f2fbcb37b56fd 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListStatusRemoteIterator.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListStatusRemoteIterator.java
@@ -31,7 +31,6 @@
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
public class AbfsListStatusRemoteIterator implements RemoteIterator {
@@ -126,13 +125,13 @@ private void asyncOp() {
}
} catch (IOException e) {
ioException = e;
+ iteratorsQueue.offer(Collections.emptyIterator());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.error("Thread got interrupted: {}", e);
} finally {
synchronized (asyncOpLock) {
isAsyncInProgress = false;
- iteratorsQueue.offer(Collections.emptyIterator());
}
}
}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsListStatusIterator.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsListStatusIterator.java
index 361a6673dd948..e7b1f4cd16922 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsListStatusIterator.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsListStatusIterator.java
@@ -29,7 +29,6 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import org.apache.hadoop.test.LambdaTestUtils;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.mockito.Mockito;
From 7c4f64a45183bd03677c68c241642e41530cf88e Mon Sep 17 00:00:00 2001
From: Bilahari T H
Date: Thu, 21 Jan 2021 10:48:47 +0530
Subject: [PATCH 10/17] Addressing review comments
---
.../fs/azurebfs/AzureBlobFileSystem.java | 2 +-
.../AbfsListStatusRemoteIterator.java | 56 +++++++++++--------
...=> ITestAbfsListStatusRemoteIterator.java} | 15 ++---
3 files changed, 39 insertions(+), 34 deletions(-)
rename hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/{ITestAbfsListStatusIterator.java => ITestAbfsListStatusRemoteIterator.java} (96%)
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
index 63ef7cbd6f084..da7759f2700a6 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
@@ -45,8 +45,8 @@
import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsClientThrottlingIntercept;
-import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.azurebfs.services.AbfsListStatusRemoteIterator;
+import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListStatusRemoteIterator.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListStatusRemoteIterator.java
index f2fbcb37b56fd..ce5bc7708fb3e 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListStatusRemoteIterator.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListStatusRemoteIterator.java
@@ -43,14 +43,13 @@ public class AbfsListStatusRemoteIterator implements RemoteIterator
private final FileStatus fileStatus;
private final ListingSupport listingSupport;
- private final ArrayBlockingQueue> iteratorsQueue;
+ private final ArrayBlockingQueue iteratorsQueue;
private final Object asyncOpLock = new Object();
private volatile boolean isAsyncInProgress = false;
- private boolean firstBatch = true;
+ private boolean isIterationComplete = false;
private String continuation;
private Iterator currIterator;
- private IOException ioException;
public AbfsListStatusRemoteIterator(final FileStatus fileStatus,
final ListingSupport listingSupport) {
@@ -67,6 +66,9 @@ public boolean hasNext() throws IOException {
return true;
}
updateCurrentIterator();
+ if (currIterator == null) {
+ return false;
+ }
return currIterator.hasNext();
}
@@ -79,32 +81,32 @@ public FileStatus next() throws IOException {
}
private void updateCurrentIterator() throws IOException {
+ do {
+ currIterator = getNextIterator();
+ } while (currIterator != null && !currIterator.hasNext()
+ && !isIterationComplete);
+ }
+
+ private Iterator getNextIterator() throws IOException {
fetchBatchesAsync();
synchronized (this) {
- if (iteratorsQueue.isEmpty()) {
- if (ioException != null) {
- throw ioException;
- }
- if (isListingComplete()) {
- return;
- }
+ if (iteratorsQueue.isEmpty() && isIterationComplete) {
+ return null;
}
}
try {
- currIterator = iteratorsQueue.take();
- if (!currIterator.hasNext() && !isListingComplete()) {
- updateCurrentIterator();
+ Object obj = iteratorsQueue.take();
+ if(obj instanceof Iterator){
+ return (Iterator) obj;
}
+ throw (IOException) obj;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.error("Thread got interrupted: {}", e);
+ return null;
}
}
- private synchronized boolean isListingComplete() {
- return !firstBatch && (continuation == null || continuation.isEmpty());
- }
-
private void fetchBatchesAsync() {
if (isAsyncInProgress) {
return;
@@ -120,12 +122,16 @@ private void fetchBatchesAsync() {
private void asyncOp() {
try {
- while (!isListingComplete() && iteratorsQueue.size() <= MAX_QUEUE_SIZE) {
+ while (!isIterationComplete && iteratorsQueue.size() <= MAX_QUEUE_SIZE) {
addNextBatchIteratorToQueue();
}
} catch (IOException e) {
- ioException = e;
- iteratorsQueue.offer(Collections.emptyIterator());
+ try {
+ iteratorsQueue.put(e);
+ } catch (InterruptedException interruptedException) {
+ Thread.currentThread().interrupt();
+ LOG.error("Thread got interrupted: {}", interruptedException);
+ }
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.error("Thread got interrupted: {}", e);
@@ -136,15 +142,17 @@ private void asyncOp() {
}
}
- private synchronized void addNextBatchIteratorToQueue()
+ private void addNextBatchIteratorToQueue()
throws IOException, InterruptedException {
List fileStatuses = new ArrayList<>();
continuation = listingSupport
.listStatus(fileStatus.getPath(), null, fileStatuses, FETCH_ALL_FALSE,
continuation);
- iteratorsQueue.put(fileStatuses.iterator());
- if (firstBatch) {
- firstBatch = false;
+ synchronized (this) {
+ if (continuation == null || continuation.isEmpty()) {
+ isIterationComplete = true;
+ }
+ iteratorsQueue.put(fileStatuses.iterator());
}
}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsListStatusIterator.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsListStatusRemoteIterator.java
similarity index 96%
rename from hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsListStatusIterator.java
rename to hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsListStatusRemoteIterator.java
index e7b1f4cd16922..037906f0c5116 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsListStatusIterator.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsListStatusRemoteIterator.java
@@ -48,11 +48,11 @@
/**
* Test ListStatusRemoteIterator operation.
*/
-public class ITestAbfsListStatusIterator extends AbstractAbfsIntegrationTest {
+public class ITestAbfsListStatusRemoteIterator extends AbstractAbfsIntegrationTest {
private static final int TEST_FILES_NUMBER = 1000;
- public ITestAbfsListStatusIterator() throws Exception {
+ public ITestAbfsListStatusRemoteIterator() throws Exception {
super();
}
@@ -319,13 +319,10 @@ private List createFilesUnderDirectory(int numFiles, Path rootPath,
ExecutorService es = Executors.newFixedThreadPool(10);
for (int i = 0; i < numFiles; i++) {
final Path filePath = new Path(rootPath, filenamePrefix + i);
- Callable callable = new Callable() {
- @Override
- public Void call() throws Exception {
- getFileSystem().create(filePath);
- fileNames.add(makeQualified(filePath).toString());
- return null;
- }
+ Callable callable = () -> {
+ getFileSystem().create(filePath);
+ fileNames.add(makeQualified(filePath).toString());
+ return null;
};
tasks.add(es.submit(callable));
}
From f578bbd7c776751dfbfc63b1e586c2308c3f2501 Mon Sep 17 00:00:00 2001
From: Bilahari T H
Date: Mon, 25 Jan 2021 21:04:59 +0530
Subject: [PATCH 11/17] Addressing review comments
---
.../org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java | 2 +-
.../azurebfs/services/AbfsListStatusRemoteIterator.java | 8 ++++----
2 files changed, 5 insertions(+), 5 deletions(-)
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
index e9cb6ba5dc3b0..193be48029a34 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
@@ -276,7 +276,7 @@ public class AbfsConfiguration{
private long sasTokenRenewPeriodForStreamsInSeconds;
@BooleanConfigurationValidatorAnnotation(ConfigurationKey =
- FS_AZURE_ENABLE_FLUSH, DefaultValue = DEFAULT_ENABLE_ABFS_LIST_ITERATOR)
+ FS_AZURE_ENABLE_ABFS_LIST_ITERATOR, DefaultValue = DEFAULT_ENABLE_ABFS_LIST_ITERATOR)
private boolean enableAbfsListIterator;
public AbfsConfiguration(final Configuration rawConfig, String accountName)
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListStatusRemoteIterator.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListStatusRemoteIterator.java
index ce5bc7708fb3e..ff7bec8a81322 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListStatusRemoteIterator.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListStatusRemoteIterator.java
@@ -44,7 +44,6 @@ public class AbfsListStatusRemoteIterator implements RemoteIterator
private final FileStatus fileStatus;
private final ListingSupport listingSupport;
private final ArrayBlockingQueue iteratorsQueue;
- private final Object asyncOpLock = new Object();
private volatile boolean isAsyncInProgress = false;
private boolean isIterationComplete = false;
@@ -111,7 +110,7 @@ private void fetchBatchesAsync() {
if (isAsyncInProgress) {
return;
}
- synchronized (asyncOpLock) {
+ synchronized (this) {
if (isAsyncInProgress) {
return;
}
@@ -136,7 +135,7 @@ private void asyncOp() {
Thread.currentThread().interrupt();
LOG.error("Thread got interrupted: {}", e);
} finally {
- synchronized (asyncOpLock) {
+ synchronized (this ) {
isAsyncInProgress = false;
}
}
@@ -148,11 +147,12 @@ private void addNextBatchIteratorToQueue()
continuation = listingSupport
.listStatus(fileStatus.getPath(), null, fileStatuses, FETCH_ALL_FALSE,
continuation);
+ iteratorsQueue.put(fileStatuses.iterator());
synchronized (this) {
if (continuation == null || continuation.isEmpty()) {
isIterationComplete = true;
+ iteratorsQueue.put(Collections.emptyIterator());
}
- iteratorsQueue.put(fileStatuses.iterator());
}
}
From 89222d1f2d56cafe5075b5b570a3ca3a73f25b42 Mon Sep 17 00:00:00 2001
From: Bilahari T H
Date: Mon, 25 Jan 2021 21:08:15 +0530
Subject: [PATCH 12/17] Addressing review comments
---
.../fs/azurebfs/services/AbfsListStatusRemoteIterator.java | 7 ++-----
1 file changed, 2 insertions(+), 5 deletions(-)
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListStatusRemoteIterator.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListStatusRemoteIterator.java
index ff7bec8a81322..3447d778e645f 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListStatusRemoteIterator.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListStatusRemoteIterator.java
@@ -65,9 +65,6 @@ public boolean hasNext() throws IOException {
return true;
}
updateCurrentIterator();
- if (currIterator == null) {
- return false;
- }
return currIterator.hasNext();
}
@@ -90,7 +87,7 @@ private Iterator getNextIterator() throws IOException {
fetchBatchesAsync();
synchronized (this) {
if (iteratorsQueue.isEmpty() && isIterationComplete) {
- return null;
+ return Collections.emptyIterator();
}
}
try {
@@ -102,7 +99,7 @@ private Iterator getNextIterator() throws IOException {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.error("Thread got interrupted: {}", e);
- return null;
+ return Collections.emptyIterator();
}
}
From 5970d6ec245c2bfb273435733de5b76f2ddcb22b Mon Sep 17 00:00:00 2001
From: Bilahari T H
Date: Wed, 27 Jan 2021 15:29:13 +0530
Subject: [PATCH 13/17] Adressing review comments. Checkstyle fixes.
---
.../AbfsListStatusRemoteIterator.java | 53 ++++++++++---------
.../fs/azurebfs/services/ListingSupport.java | 5 +-
.../ITestAbfsListStatusRemoteIterator.java | 12 +++--
3 files changed, 37 insertions(+), 33 deletions(-)
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListStatusRemoteIterator.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListStatusRemoteIterator.java
index 3447d778e645f..42eddf53ae88f 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListStatusRemoteIterator.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListStatusRemoteIterator.java
@@ -26,6 +26,8 @@
import java.util.NoSuchElementException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import javax.activation.UnsupportedDataTypeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,13 +35,15 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.RemoteIterator;
-public class AbfsListStatusRemoteIterator implements RemoteIterator {
+public class AbfsListStatusRemoteIterator
+ implements RemoteIterator {
private static final Logger LOG = LoggerFactory
.getLogger(AbfsListStatusRemoteIterator.class);
private static final boolean FETCH_ALL_FALSE = false;
private static final int MAX_QUEUE_SIZE = 10;
+ private static final long POLL_WAIT_TIME_IN_MS = 250;
private final FileStatus fileStatus;
private final ListingSupport listingSupport;
@@ -64,7 +68,7 @@ public boolean hasNext() throws IOException {
if (currIterator.hasNext()) {
return true;
}
- updateCurrentIterator();
+ currIterator = getNextIterator();
return currIterator.hasNext();
}
@@ -76,39 +80,36 @@ public FileStatus next() throws IOException {
return currIterator.next();
}
- private void updateCurrentIterator() throws IOException {
- do {
- currIterator = getNextIterator();
- } while (currIterator != null && !currIterator.hasNext()
- && !isIterationComplete);
- }
-
private Iterator getNextIterator() throws IOException {
fetchBatchesAsync();
- synchronized (this) {
- if (iteratorsQueue.isEmpty() && isIterationComplete) {
- return Collections.emptyIterator();
- }
- }
try {
- Object obj = iteratorsQueue.take();
- if(obj instanceof Iterator){
+ Object obj = null;
+ while (obj == null
+ && (!isIterationComplete || !iteratorsQueue.isEmpty())) {
+ obj = iteratorsQueue.poll(POLL_WAIT_TIME_IN_MS, TimeUnit.MILLISECONDS);
+ }
+ if (obj == null) {
+ return Collections.emptyIterator();
+ } else if (obj instanceof Iterator) {
return (Iterator) obj;
+ } else if (obj instanceof IOException) {
+ throw (IOException) obj;
+ } else {
+ throw new UnsupportedDataTypeException();
}
- throw (IOException) obj;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.error("Thread got interrupted: {}", e);
- return Collections.emptyIterator();
+ throw new IOException(e);
}
}
private void fetchBatchesAsync() {
- if (isAsyncInProgress) {
+ if (isAsyncInProgress || isIterationComplete) {
return;
}
synchronized (this) {
- if (isAsyncInProgress) {
+ if (isAsyncInProgress || isIterationComplete) {
return;
}
isAsyncInProgress = true;
@@ -121,9 +122,10 @@ private void asyncOp() {
while (!isIterationComplete && iteratorsQueue.size() <= MAX_QUEUE_SIZE) {
addNextBatchIteratorToQueue();
}
- } catch (IOException e) {
+ } catch (IOException ioe) {
+ LOG.error("Fetching filestatuses failed", ioe);
try {
- iteratorsQueue.put(e);
+ iteratorsQueue.put(ioe);
} catch (InterruptedException interruptedException) {
Thread.currentThread().interrupt();
LOG.error("Thread got interrupted: {}", interruptedException);
@@ -132,7 +134,7 @@ private void asyncOp() {
Thread.currentThread().interrupt();
LOG.error("Thread got interrupted: {}", e);
} finally {
- synchronized (this ) {
+ synchronized (this) {
isAsyncInProgress = false;
}
}
@@ -144,11 +146,12 @@ private void addNextBatchIteratorToQueue()
continuation = listingSupport
.listStatus(fileStatus.getPath(), null, fileStatuses, FETCH_ALL_FALSE,
continuation);
- iteratorsQueue.put(fileStatuses.iterator());
+ if(!fileStatuses.isEmpty()) {
+ iteratorsQueue.put(fileStatuses.iterator());
+ }
synchronized (this) {
if (continuation == null || continuation.isEmpty()) {
isIterationComplete = true;
- iteratorsQueue.put(Collections.emptyIterator());
}
}
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListingSupport.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListingSupport.java
index 08378d68bc8e0..dc3203f11e4d5 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListingSupport.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListingSupport.java
@@ -31,7 +31,7 @@ public interface ListingSupport {
* @param path The list path.
* @return the entries in the path.
*/
- FileStatus[] listStatus(final Path path) throws IOException;
+ FileStatus[] listStatus(Path path) throws IOException;
/**
* @param path Path the list path.
@@ -48,8 +48,7 @@ public interface ListingSupport {
* @return the entries in the path start from "startFrom" in lexical order.
*/
@InterfaceStability.Unstable
- FileStatus[] listStatus(final Path path, final String startFrom)
- throws IOException;
+ FileStatus[] listStatus(Path path, String startFrom) throws IOException;
/**
* @param path The list path
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsListStatusRemoteIterator.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsListStatusRemoteIterator.java
index 037906f0c5116..5e6034827e4ee 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsListStatusRemoteIterator.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsListStatusRemoteIterator.java
@@ -57,7 +57,7 @@ public ITestAbfsListStatusRemoteIterator() throws Exception {
}
@Test
- public void testListStatusRemoteIterator() throws Exception {
+ public void testAbfsIteratorWithHasNext() throws Exception {
Path testDir = createTestDirectory();
setPageSize(10);
final List fileNames = createFilesUnderDirectory(TEST_FILES_NUMBER,
@@ -85,14 +85,15 @@ public void testListStatusRemoteIterator() throws Exception {
.describedAs("After removing every iterm found from the iterator, "
+ "there should be no more elements in the fileNames")
.isEqualTo(0);
- verify(listngSupport, Mockito.atLeast(100))
+ int minNumberOfInvokations = TEST_FILES_NUMBER / 10;
+ verify(listngSupport, Mockito.atLeast(minNumberOfInvokations))
.listStatus(any(Path.class), nullable(String.class),
anyList(), anyBoolean(),
nullable(String.class));
}
@Test
- public void testListStatusRemoteIteratorWithoutHasNext() throws Exception {
+ public void testAbfsIteratorWithoutHasNext() throws Exception {
Path testDir = createTestDirectory();
setPageSize(10);
final List fileNames = createFilesUnderDirectory(TEST_FILES_NUMBER,
@@ -125,7 +126,8 @@ public void testListStatusRemoteIteratorWithoutHasNext() throws Exception {
.describedAs("After removing every iterm found from the iterator, "
+ "there should be no more elements in the fileNames")
.isEqualTo(0);
- verify(listngSupport, Mockito.atLeast(100))
+ int minNumberOfInvokations = TEST_FILES_NUMBER / 10;
+ verify(listngSupport, Mockito.atLeast(minNumberOfInvokations))
.listStatus(any(Path.class), nullable(String.class),
anyList(), anyBoolean(),
nullable(String.class));
@@ -163,7 +165,7 @@ public void testWithAbfsIteratorDisabled() throws Exception {
}
@Test
- public void testWithAbfsIteratorDisabledWithutHasNext() throws Exception {
+ public void testWithAbfsIteratorDisabledWithoutHasNext() throws Exception {
Path testDir = createTestDirectory();
setPageSize(10);
setEnableAbfsIterator(false);
From 9b8053c7a32cce97aab5b256452132b4a397c2e9 Mon Sep 17 00:00:00 2001
From: Bilahari T H
Date: Tue, 2 Feb 2021 20:30:50 +0530
Subject: [PATCH 14/17] Addressing review comments
---
.../AbfsListStatusRemoteIterator.java | 2 +-
.../fs/azurebfs/services/ListingSupport.java | 4 ++-
.../ITestAbfsListStatusRemoteIterator.java | 28 ++++++++++---------
3 files changed, 19 insertions(+), 15 deletions(-)
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListStatusRemoteIterator.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListStatusRemoteIterator.java
index 42eddf53ae88f..0c664fc2fbbc4 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListStatusRemoteIterator.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsListStatusRemoteIterator.java
@@ -146,7 +146,7 @@ private void addNextBatchIteratorToQueue()
continuation = listingSupport
.listStatus(fileStatus.getPath(), null, fileStatuses, FETCH_ALL_FALSE,
continuation);
- if(!fileStatuses.isEmpty()) {
+ if (!fileStatuses.isEmpty()) {
iteratorsQueue.put(fileStatuses.iterator());
}
synchronized (this) {
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListingSupport.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListingSupport.java
index dc3203f11e4d5..168757d94c8ef 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListingSupport.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListingSupport.java
@@ -21,10 +21,13 @@
import java.io.IOException;
import java.util.List;
+import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
public interface ListingSupport {
/**
@@ -47,7 +50,6 @@ public interface ListingSupport {
* "/folder/hfile" and "/folder/ifile".
* @return the entries in the path start from "startFrom" in lexical order.
*/
- @InterfaceStability.Unstable
FileStatus[] listStatus(Path path, String startFrom) throws IOException;
/**
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsListStatusRemoteIterator.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsListStatusRemoteIterator.java
index 5e6034827e4ee..6d5e4cf3bce2d 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsListStatusRemoteIterator.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsListStatusRemoteIterator.java
@@ -53,7 +53,6 @@ public class ITestAbfsListStatusRemoteIterator extends AbstractAbfsIntegrationTe
private static final int TEST_FILES_NUMBER = 1000;
public ITestAbfsListStatusRemoteIterator() throws Exception {
- super();
}
@Test
@@ -319,19 +318,22 @@ private List createFilesUnderDirectory(int numFiles, Path rootPath,
final List> tasks = new ArrayList<>();
final List fileNames = new ArrayList<>();
ExecutorService es = Executors.newFixedThreadPool(10);
- for (int i = 0; i < numFiles; i++) {
- final Path filePath = new Path(rootPath, filenamePrefix + i);
- Callable callable = () -> {
- getFileSystem().create(filePath);
- fileNames.add(makeQualified(filePath).toString());
- return null;
- };
- tasks.add(es.submit(callable));
- }
- for (Future task : tasks) {
- task.get();
+ try {
+ for (int i = 0; i < numFiles; i++) {
+ final Path filePath = new Path(rootPath, filenamePrefix + i);
+ Callable callable = () -> {
+ getFileSystem().create(filePath);
+ fileNames.add(makeQualified(filePath).toString());
+ return null;
+ };
+ tasks.add(es.submit(callable));
+ }
+ for (Future task : tasks) {
+ task.get();
+ }
+ } finally {
+ es.shutdownNow();
}
- es.shutdownNow();
return fileNames;
}
From 919245daaa42c9b46903f198f8eba58d21226998 Mon Sep 17 00:00:00 2001
From: Bilahari T H
Date: Wed, 3 Feb 2021 17:07:36 +0530
Subject: [PATCH 15/17] To ignore the findbug warning related to continuation
---
.../hadoop-azure/dev-support/findbugs-exclude.xml | 9 +++++++++
1 file changed, 9 insertions(+)
diff --git a/hadoop-tools/hadoop-azure/dev-support/findbugs-exclude.xml b/hadoop-tools/hadoop-azure/dev-support/findbugs-exclude.xml
index 7087d786a3fa9..6811f3dc72eb0 100644
--- a/hadoop-tools/hadoop-azure/dev-support/findbugs-exclude.xml
+++ b/hadoop-tools/hadoop-azure/dev-support/findbugs-exclude.xml
@@ -74,4 +74,13 @@
+
+
+
+
+
+
+
+
From 51bd08a71719310ad11b22f107fd5a4378d27271 Mon Sep 17 00:00:00 2001
From: Bilahari T H
Date: Wed, 3 Feb 2021 17:11:30 +0530
Subject: [PATCH 16/17] Fixing javadoc issues
---
.../apache/hadoop/fs/azurebfs/services/ListingSupport.java | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListingSupport.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListingSupport.java
index 168757d94c8ef..4c449409aafde 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListingSupport.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListingSupport.java
@@ -33,6 +33,7 @@ public interface ListingSupport {
/**
* @param path The list path.
* @return the entries in the path.
+ * @throws IOException in case of error
*/
FileStatus[] listStatus(Path path) throws IOException;
@@ -49,6 +50,7 @@ public interface ListingSupport {
* (Path("/folder"), "cfile") will return
* "/folder/hfile" and "/folder/ifile".
* @return the entries in the path start from "startFrom" in lexical order.
+ * @throws IOException in case of error
*/
FileStatus[] listStatus(Path path, String startFrom) throws IOException;
@@ -70,7 +72,7 @@ public interface ListingSupport {
* result.
* @param continuation Contiuation token. null means start rom the begining.
* @return Continuation tokem
- * @throws IOException
+ * @throws IOException in case of error
*/
String listStatus(Path path, String startFrom, List fileStatuses,
boolean fetchAll, String continuation) throws IOException;
From b01659a20c2a26fdf45a19e073dc2eb8b76d6f6a Mon Sep 17 00:00:00 2001
From: Bilahari T H
Date: Wed, 3 Feb 2021 19:57:55 +0530
Subject: [PATCH 17/17] findbugs fix
---
hadoop-tools/hadoop-azure/dev-support/findbugs-exclude.xml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/hadoop-tools/hadoop-azure/dev-support/findbugs-exclude.xml b/hadoop-tools/hadoop-azure/dev-support/findbugs-exclude.xml
index 6811f3dc72eb0..b750b8b91c79e 100644
--- a/hadoop-tools/hadoop-azure/dev-support/findbugs-exclude.xml
+++ b/hadoop-tools/hadoop-azure/dev-support/findbugs-exclude.xml
@@ -79,7 +79,7 @@
outside synchronized block since the same is costly. -->
-
+