diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/WeakReferenceMap.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/WeakReferenceMap.java
new file mode 100644
index 0000000000000..7ae518779f1fb
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/WeakReferenceMap.java
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util;
+
+import java.lang.ref.WeakReference;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import javax.annotation.Nullable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.store.LogExactlyOnce;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * A map of keys type K to objects of type V which uses weak references,
+ * so does lot leak memory through long-lived references
+ * at the expense of losing references when GC takes place..
+ *
+ * This class is intended be used instead of ThreadLocal storage when
+ * references are to be cleaned up when the instance holding.
+ * In this use case, the key is the Long key.
+ *
+ * Concurrency.
+ * The class assumes that map entries are rarely contended for when writing,
+ * and that not blocking other threads is more important than atomicity.
+ * - a ConcurrentHashMap is used to map keys to weak references, with
+ * all its guarantees.
+ * - there is no automatic pruning.
+ * - see {@link #create(Object)} for the concurrency semantics on entry creation.
+ */
+@InterfaceAudience.Private
+public class WeakReferenceMap {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(WeakReferenceMap.class);
+
+ /**
+ * The reference map.
+ */
+ private final Map> map = new ConcurrentHashMap<>();
+
+ /**
+ * Supplier of new instances.
+ */
+ private final Function super K, ? extends V> factory;
+
+ /**
+ * Nullable callback when a get on a key got a weak reference back.
+ * The assumption is that this is for logging/stats, which is why
+ * no attempt is made to use the call as a supplier of a new value.
+ */
+ private final Consumer super K> referenceLost;
+
+ /**
+ * Counter of references lost.
+ */
+ private final AtomicLong referenceLostCount = new AtomicLong();
+
+ /**
+ * Counter of entries created.
+ */
+ private final AtomicLong entriesCreatedCount = new AtomicLong();
+
+ /**
+ * Log to report loss of a reference during the create phase, which
+ * is believed to be a cause of HADOOP-18456.
+ */
+ private final LogExactlyOnce referenceLostDuringCreation = new LogExactlyOnce(LOG);
+
+ /**
+ * instantiate.
+ * @param factory supplier of new instances
+ * @param referenceLost optional callback on lost references.
+ */
+ public WeakReferenceMap(
+ Function super K, ? extends V> factory,
+ @Nullable final Consumer super K> referenceLost) {
+
+ this.factory = requireNonNull(factory);
+ this.referenceLost = referenceLost;
+ }
+
+ @Override
+ public String toString() {
+ return "WeakReferenceMap{" +
+ "size=" + size() +
+ ", referenceLostCount=" + referenceLostCount +
+ ", entriesCreatedCount=" + entriesCreatedCount +
+ '}';
+ }
+
+ /**
+ * Map size.
+ * @return the current map size.
+ */
+ public int size() {
+ return map.size();
+ }
+
+ /**
+ * Clear all entries.
+ */
+ public void clear() {
+ map.clear();
+ }
+
+ /**
+ * look up the value, returning the possibly empty weak reference
+ * to a value, or null if no value was found.
+ * @param key key to look up
+ * @return null if there is no entry, a weak reference if found
+ */
+ public WeakReference lookup(K key) {
+ return map.get(key);
+ }
+
+ /**
+ * Get the value, creating if needed.
+ * @param key key.
+ * @return an instance.
+ */
+ public V get(K key) {
+ final WeakReference currentWeakRef = lookup(key);
+ // resolve it, after which if not null, we have a strong reference
+ V strongVal = resolve(currentWeakRef);
+ if (strongVal != null) {
+ // all good.
+ return strongVal;
+ }
+
+ // here, either currentWeakRef was null, or its reference was GC'd.
+ if (currentWeakRef != null) {
+ // garbage collection removed the reference.
+
+ // explicitly remove the weak ref from the map if it has not
+ // been updated by this point
+ // this is here just for completeness.
+ map.remove(key, currentWeakRef);
+
+ // log/report the loss.
+ noteLost(key);
+ }
+
+ // create a new value and add it to the map
+ return create(key);
+ }
+
+ /**
+ * Create a new instance under a key.
+ *
+ * The instance is created, added to the map and then the
+ * map value retrieved.
+ * This ensures that the reference returned is that in the map,
+ * even if there is more than one entry being created at the same time.
+ * If that race does occur, it will be logged the first time it happens
+ * for this specific map instance.
+ *
+ * HADOOP-18456 highlighted the risk of a concurrent GC resulting a null
+ * value being retrieved and so returned.
+ * To prevent this:
+ *
+ *
A strong reference is retained to the newly created instance
+ * in a local variable.
+ *
That variable is used after the resolution process, to ensure
+ * the JVM doesn't consider it "unreachable" and so eligible for GC.
+ *
A check is made for the resolved reference being null, and if so,
+ * the put() is repeated
+ *
+ * @param key key
+ * @return the created value
+ */
+ public V create(K key) {
+ entriesCreatedCount.incrementAndGet();
+ /*
+ Get a strong ref so even if a GC happens in this method the reference is not lost.
+ It is NOT enough to have a reference in a field, it MUST be used
+ so as to ensure the reference isn't optimized away prematurely.
+ "A reachable object is any object that can be accessed in any potential continuing
+ computation from any live thread."
+ */
+
+ final V strongRef = requireNonNull(factory.apply(key),
+ "factory returned a null instance");
+ V resolvedStrongRef;
+ do {
+ WeakReference newWeakRef = new WeakReference<>(strongRef);
+
+ // put it in the map
+ map.put(key, newWeakRef);
+
+ // get it back from the map
+ WeakReference retrievedWeakRef = map.get(key);
+ // resolve that reference, handling the situation where somehow it was removed from the map
+ // between the put() and the get()
+ resolvedStrongRef = resolve(retrievedWeakRef);
+ if (resolvedStrongRef == null) {
+ referenceLostDuringCreation.warn("reference to %s lost during creation", key);
+ noteLost(key);
+ }
+ } while (resolvedStrongRef == null);
+
+ // note if there was any change in the reference.
+ // as this forces strongRef to be kept in scope
+ if (strongRef != resolvedStrongRef) {
+ LOG.debug("Created instance for key {}: {} overwritten by {}",
+ key, strongRef, resolvedStrongRef);
+ }
+
+ return resolvedStrongRef;
+ }
+
+ /**
+ * Put a value under the key.
+ * A null value can be put, though on a get() call
+ * a new entry is generated
+ *
+ * @param key key
+ * @param value value
+ * @return any old non-null reference.
+ */
+ public V put(K key, V value) {
+ return resolve(map.put(key, new WeakReference<>(value)));
+ }
+
+ /**
+ * Remove any value under the key.
+ * @param key key
+ * @return any old non-null reference.
+ */
+ public V remove(K key) {
+ return resolve(map.remove(key));
+ }
+
+ /**
+ * Does the map have a valid reference for this object?
+ * no-side effects: there's no attempt to notify or cleanup
+ * if the reference is null.
+ * @param key key to look up
+ * @return true if there is a valid reference.
+ */
+ public boolean containsKey(K key) {
+ final WeakReference current = lookup(key);
+ return resolve(current) != null;
+ }
+
+ /**
+ * Given a possibly null weak reference, resolve
+ * its value.
+ * @param r reference to resolve
+ * @return the value or null
+ */
+ protected V resolve(WeakReference r) {
+ return r == null ? null : r.get();
+ }
+
+ /**
+ * Prune all null weak references, calling the referenceLost
+ * callback for each one.
+ *
+ * non-atomic and non-blocking.
+ * @return the number of entries pruned.
+ */
+ public int prune() {
+ int count = 0;
+ final Iterator>> it = map.entrySet().iterator();
+ while (it.hasNext()) {
+ final Map.Entry> next = it.next();
+ if (next.getValue().get() == null) {
+ it.remove();
+ count++;
+ noteLost(next.getKey());
+ }
+ }
+ return count;
+ }
+
+ /**
+ * Notify the reference lost callback.
+ * @param key key of lost reference
+ */
+ private void noteLost(final K key) {
+ // increment local counter
+ referenceLostCount.incrementAndGet();
+
+ // and call any notification function supplied in the constructor
+ if (referenceLost != null) {
+ referenceLost.accept(key);
+ }
+ }
+
+ /**
+ * Get count of references lost as detected
+ * during prune() or get() calls.
+ * @return count of references lost
+ */
+ public final long getReferenceLostCount() {
+ return referenceLostCount.get();
+ }
+
+ /**
+ * Get count of entries created on demand.
+ * @return count of entries created
+ */
+ public final long getEntriesCreatedCount() {
+ return entriesCreatedCount.get();
+ }
+}
+
diff --git a/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml b/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml
index 070c8c1fe827a..2065746b76611 100644
--- a/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml
+++ b/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml
@@ -48,4 +48,11 @@
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]utils[\\/]Base64.java"/>
+
+
+
+
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
index 1942386afc3c4..b1df9fb1c360b 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
@@ -119,6 +119,15 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_OPTIMIZE_FOOTER_READ)
private boolean optimizeFooterRead;
+ @BooleanConfigurationValidatorAnnotation(
+ ConfigurationKey = FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED,
+ DefaultValue = DEFAULT_FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED)
+ private boolean isExpectHeaderEnabled;
+
+ @BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED,
+ DefaultValue = DEFAULT_FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED)
+ private boolean accountThrottlingEnabled;
+
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_READ_BUFFER_SIZE,
MinValue = MIN_BUFFER_SIZE,
MaxValue = MAX_BUFFER_SIZE,
@@ -275,6 +284,14 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_ENABLE_AUTOTHROTTLING)
private boolean enableAutoThrottling;
+ @IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ACCOUNT_OPERATION_IDLE_TIMEOUT,
+ DefaultValue = DEFAULT_ACCOUNT_OPERATION_IDLE_TIMEOUT_MS)
+ private int accountOperationIdleTimeout;
+
+ @IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ANALYSIS_PERIOD,
+ DefaultValue = DEFAULT_ANALYSIS_PERIOD_MS)
+ private int analysisPeriod;
+
@StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_USER_AGENT_PREFIX_KEY,
DefaultValue = DEFAULT_FS_AZURE_USER_AGENT_PREFIX)
private String userAgentId;
@@ -765,6 +782,14 @@ public String getAppendBlobDirs() {
return this.azureAppendBlobDirs;
}
+ public boolean isExpectHeaderEnabled() {
+ return this.isExpectHeaderEnabled;
+ }
+
+ public boolean accountThrottlingEnabled() {
+ return accountThrottlingEnabled;
+ }
+
public String getAzureInfiniteLeaseDirs() {
return this.azureInfiniteLeaseDirs;
}
@@ -807,6 +832,14 @@ public boolean isAutoThrottlingEnabled() {
return this.enableAutoThrottling;
}
+ public int getAccountOperationIdleTimeout() {
+ return accountOperationIdleTimeout;
+ }
+
+ public int getAnalysisPeriod() {
+ return analysisPeriod;
+ }
+
public String getCustomUserAgentPrefix() {
return "abfsdriverV2.1";
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
index 966a9a5f3c481..058158c5ca1b1 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
@@ -58,7 +58,6 @@
import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
-import org.apache.hadoop.fs.azurebfs.services.AbfsClientThrottlingIntercept;
import org.apache.hadoop.fs.azurebfs.services.AbfsListStatusRemoteIterator;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.classification.InterfaceStability;
@@ -264,7 +263,6 @@ public void initialize(URI uri, Configuration configuration)
}
}
- AbfsClientThrottlingIntercept.initializeSingleton(abfsConfiguration.isAutoThrottlingEnabled());
boolean isRedirect = abfsConfiguration.isRedirection();
if (isRedirect) {
String abfsUrl = uri.toString();
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
index ddab155549162..33607a0cae90a 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
@@ -1109,6 +1109,7 @@ private AbfsOutputStreamContext populateAbfsOutputStreamContext(
}
return new AbfsOutputStreamContext(abfsConfiguration.getSasTokenRenewPeriodForStreamsInSeconds())
.withWriteBufferSize(bufferSize)
+ .enableExpectHeader(abfsConfiguration.isExpectHeaderEnabled())
.enableFlush(abfsConfiguration.isFlushEnabled())
.enableSmallWriteOptimization(abfsConfiguration.isSmallWriteOptimizationEnabled())
.disableOutputStreamFlush(abfsConfiguration.isOutputStreamFlushDisabled())
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java
index 32fa802200ac8..e48ad048b21a0 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java
@@ -74,6 +74,11 @@ public final class AbfsHttpConstants {
public static final String HTTP_METHOD_PATCH = "PATCH";
public static final String HTTP_METHOD_POST = "POST";
public static final String HTTP_METHOD_PUT = "PUT";
+ /**
+ * All status codes less than http 100 signify error
+ * and should qualify for retry.
+ */
+ public static final int HTTP_CONTINUE = 100;
// Abfs generic constants
public static final String SINGLE_WHITE_SPACE = " ";
@@ -120,6 +125,9 @@ public final class AbfsHttpConstants {
public static final String DEFAULT_SCOPE = "default:";
public static final String PERMISSION_FORMAT = "%04d";
public static final String SUPER_USER = "$superuser";
+ // The HTTP 100 Continue informational status response code indicates that everything so far
+ // is OK and that the client should continue with the request or ignore it if it is already finished.
+ public static final String HUNDRED_CONTINUE = "100-continue";
public static final char CHAR_FORWARD_SLASH = '/';
public static final char CHAR_EXCLAMATION_POINT = '!';
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
index a46c91f5f2198..9070e6b925042 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
@@ -35,9 +35,15 @@ public final class ConfigurationKeys {
* path to determine HNS status.
*/
public static final String FS_AZURE_ACCOUNT_IS_HNS_ENABLED = "fs.azure.account.hns.enabled";
+ /**
+ * Enable or disable expect hundred continue header.
+ * Value: {@value}.
+ */
+ public static final String FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED = "fs.azure.account.expect.header.enabled";
public static final String FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME = "fs.azure.account.key";
public static final String FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME_REGX = "fs\\.azure\\.account\\.key\\.(.*)";
public static final String FS_AZURE_SECURE_MODE = "fs.azure.secure.mode";
+ public static final String FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED = "fs.azure.account.throttling.enabled";
// Retry strategy defined by the user
public static final String AZURE_MIN_BACKOFF_INTERVAL = "fs.azure.io.retry.min.backoff.interval";
@@ -116,6 +122,8 @@ public final class ConfigurationKeys {
public static final String AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION = "fs.azure.createRemoteFileSystemDuringInitialization";
public static final String AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION = "fs.azure.skipUserGroupMetadataDuringInitialization";
public static final String FS_AZURE_ENABLE_AUTOTHROTTLING = "fs.azure.enable.autothrottling";
+ public static final String FS_AZURE_ACCOUNT_OPERATION_IDLE_TIMEOUT = "fs.azure.account.operation.idle.timeout";
+ public static final String FS_AZURE_ANALYSIS_PERIOD = "fs.azure.analysis.period";
public static final String FS_AZURE_ALWAYS_USE_HTTPS = "fs.azure.always.use.https";
public static final String FS_AZURE_ATOMIC_RENAME_KEY = "fs.azure.atomic.rename.key";
/** This config ensures that during create overwrite an existing file will be
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
index 29f44bf86a341..e963600239fc4 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
@@ -32,7 +32,7 @@
public final class FileSystemConfigurations {
public static final String DEFAULT_FS_AZURE_ACCOUNT_IS_HNS_ENABLED = "";
-
+ public static final boolean DEFAULT_FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED = true;
public static final String USER_HOME_DIRECTORY_PREFIX = "/user";
private static final int SIXTY_SECONDS = 60 * 1000;
@@ -98,6 +98,9 @@ public final class FileSystemConfigurations {
public static final boolean DEFAULT_ENABLE_FLUSH = true;
public static final boolean DEFAULT_DISABLE_OUTPUTSTREAM_FLUSH = true;
public static final boolean DEFAULT_ENABLE_AUTOTHROTTLING = true;
+ public static final boolean DEFAULT_FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED = true;
+ public static final int DEFAULT_ACCOUNT_OPERATION_IDLE_TIMEOUT_MS = 60_000;
+ public static final int DEFAULT_ANALYSIS_PERIOD_MS = 10_000;
public static final DelegatingSSLSocketFactory.SSLChannelMode DEFAULT_FS_AZURE_SSL_CHANNEL_MODE
= DelegatingSSLSocketFactory.SSLChannelMode.Default;
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java
index 14159ca2043f5..d5751954a52a8 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java
@@ -76,6 +76,7 @@ public final class HttpHeaderConfigurations {
public static final String X_MS_COPY_SOURCE = "x-ms-copy-source";
public static final String X_MS_COPY_STATUS_DESCRIPTION = "x-ms-copy-status-description";
public static final String X_MS_COPY_STATUS = "x-ms-copy-status";
+ public static final String EXPECT = "Expect";
public static final String X_MS_METADATA_PREFIX = "x-ms-meta-";
private HttpHeaderConfigurations() {}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/InvalidAbfsRestOperationException.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/InvalidAbfsRestOperationException.java
index aba1d8c1efa2b..147cb6d83cb0b 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/InvalidAbfsRestOperationException.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/InvalidAbfsRestOperationException.java
@@ -29,12 +29,33 @@
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class InvalidAbfsRestOperationException extends AbfsRestOperationException {
+
+ private static final String ERROR_MESSAGE = "InvalidAbfsRestOperationException";
+
public InvalidAbfsRestOperationException(
final Exception innerException) {
super(
AzureServiceErrorCode.UNKNOWN.getStatusCode(),
AzureServiceErrorCode.UNKNOWN.getErrorCode(),
- "InvalidAbfsRestOperationException",
+ innerException != null
+ ? innerException.toString()
+ : ERROR_MESSAGE,
innerException);
}
+
+ /**
+ * Adds the retry count along with the exception.
+ * @param innerException The inner exception which is originally caught.
+ * @param retryCount The retry count when the exception was thrown.
+ */
+ public InvalidAbfsRestOperationException(
+ final Exception innerException, int retryCount) {
+ super(
+ AzureServiceErrorCode.UNKNOWN.getStatusCode(),
+ AzureServiceErrorCode.UNKNOWN.getErrorCode(),
+ innerException != null
+ ? innerException.toString()
+ : ERROR_MESSAGE + " RetryCount: " + retryCount,
+ innerException);
+ }
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AppendRequestParameters.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AppendRequestParameters.java
index 7369bfaf56422..57e559a60ec84 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AppendRequestParameters.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AppendRequestParameters.java
@@ -34,19 +34,22 @@ public enum Mode {
private final Mode mode;
private final boolean isAppendBlob;
private final String leaseId;
+ private boolean isExpectHeaderEnabled;
public AppendRequestParameters(final long position,
final int offset,
final int length,
final Mode mode,
final boolean isAppendBlob,
- final String leaseId) {
+ final String leaseId,
+ final boolean isExpectHeaderEnabled) {
this.position = position;
this.offset = offset;
this.length = length;
this.mode = mode;
this.isAppendBlob = isAppendBlob;
this.leaseId = leaseId;
+ this.isExpectHeaderEnabled = isExpectHeaderEnabled;
}
public long getPosition() {
@@ -72,4 +75,12 @@ public boolean isAppendBlob() {
public String getLeaseId() {
return this.leaseId;
}
+
+ public boolean isExpectHeaderEnabled() {
+ return isExpectHeaderEnabled;
+ }
+
+ public void setExpectHeaderEnabled(boolean expectHeaderEnabled) {
+ isExpectHeaderEnabled = expectHeaderEnabled;
+ }
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
index 67dabf7127fbd..fd0d450bcbfd4 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
@@ -71,15 +71,16 @@
import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import static java.net.HttpURLConnection.HTTP_CONFLICT;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.*;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_DELETE_CONSIDERED_IDEMPOTENT;
-import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.IS_FOLDER_METADATA_KEY;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SERVER_SIDE_ENCRYPTION_ALGORITHM;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.ABFS_DNS_PREFIX;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.HTTPS_SCHEME;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.WASB_DNS_PREFIX;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.*;
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.*;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
/**
* AbfsClient.
@@ -103,6 +104,7 @@ public class AbfsClient implements Closeable {
private AccessTokenProvider tokenProvider;
private SASTokenProvider sasTokenProvider;
private final AbfsCounters abfsCounters;
+ private final AbfsThrottlingIntercept intercept;
private final ListeningScheduledExecutorService executorService;
@@ -118,6 +120,7 @@ private AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCreden
this.retryPolicy = abfsClientContext.getExponentialRetryPolicy();
this.accountName = abfsConfiguration.getAccountName().substring(0, abfsConfiguration.getAccountName().indexOf(AbfsHttpConstants.DOT));
this.authType = abfsConfiguration.getAuthType(accountName);
+ this.intercept = AbfsThrottlingInterceptFactory.getInstance(accountName, abfsConfiguration);
String encryptionKey = this.abfsConfiguration
.getClientProvidedEncryptionKey();
@@ -223,6 +226,10 @@ SharedKeyCredentials getSharedKeyCredentials() {
return sharedKeyCredentials;
}
+ AbfsThrottlingIntercept getIntercept() {
+ return intercept;
+ }
+
List createDefaultHeaders() {
final List requestHeaders = new ArrayList();
requestHeaders.add(new AbfsHttpHeader(X_MS_VERSION, xMsVersion));
@@ -421,7 +428,7 @@ public AbfsRestOperation createPath(final String path, final boolean isFile, fin
if (!op.hasResult()) {
throw ex;
}
- if (!isFile && op.getResult().getStatusCode() == HttpURLConnection.HTTP_CONFLICT) {
+ if (!isFile && op.getResult().getStatusCode() == HTTP_CONFLICT) {
String existingResource =
op.getResult().getResponseHeader(X_MS_EXISTING_RESOURCE_TYPE);
if (existingResource != null && existingResource.equals(DIRECTORY)) {
@@ -472,7 +479,7 @@ public AbfsRestOperation createPathBlob(final String path, final boolean isFile,
if (!op.hasResult()) {
throw ex;
}
- if (!isFile && op.getResult().getStatusCode() == HttpURLConnection.HTTP_CONFLICT) {
+ if (!isFile && op.getResult().getStatusCode() == HTTP_CONFLICT) {
// This ensures that we don't throw ex only for existing directory but if a blob exists we throw exception.
tracingContext.setFallbackDFSAppend(tracingContext.getFallbackDFSAppend() + "M");
AbfsRestOperation blobProperty = getBlobProperty(new Path(path), tracingContext);
@@ -619,6 +626,9 @@ public AbfsRestOperation append(final String path, final byte[] buffer,
throws AzureBlobFileSystemException {
final List requestHeaders = createDefaultHeaders();
addCustomerProvidedKeyHeaders(requestHeaders);
+ if (reqParams.isExpectHeaderEnabled()) {
+ requestHeaders.add(new AbfsHttpHeader(EXPECT, HUNDRED_CONTINUE));
+ }
// JDK7 does not support PATCH, so to workaround the issue we will use
// PUT and specify the real method in the X-Http-Method-Override header.
requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE,
@@ -647,19 +657,33 @@ public AbfsRestOperation append(final String path, final byte[] buffer,
if (url.toString().contains(WASB_DNS_PREFIX)) {
url = changePrefixFromBlobtoDfs(url);
}
- final AbfsRestOperation op = new AbfsRestOperation(
- AbfsRestOperationType.Append,
- this,
- HTTP_METHOD_PUT,
- url,
- requestHeaders,
- buffer,
- reqParams.getoffset(),
- reqParams.getLength(),
- sasTokenForReuse);
+ final AbfsRestOperation op = getAbfsRestOperationForAppend(AbfsRestOperationType.Append,
+ HTTP_METHOD_PUT,
+ url,
+ requestHeaders,
+ buffer,
+ reqParams.getoffset(),
+ reqParams.getLength(),
+ sasTokenForReuse);
try {
op.execute(tracingContext);
} catch (AzureBlobFileSystemException e) {
+ /*
+ If the http response code indicates a user error we retry
+ the same append request with expect header being disabled.
+ When "100-continue" header is enabled but a non Http 100 response comes,
+ the response message might not get set correctly by the server.
+ So, this handling is to avoid breaking of backward compatibility
+ if someone has taken dependency on the exception message,
+ which is created using the error string present in the response header.
+ */
+ int responseStatusCode = ((AbfsRestOperationException) e).getStatusCode();
+ if (checkUserError(responseStatusCode) && reqParams.isExpectHeaderEnabled()) {
+ LOG.debug("User error, retrying without 100 continue enabled for the given path {}", path);
+ reqParams.setExpectHeaderEnabled(false);
+ return this.append(path, buffer, reqParams, cachedSasToken,
+ tracingContext);
+ }
// If we have no HTTP response, throw the original exception.
if (!op.hasResult()) {
throw e;
@@ -667,16 +691,15 @@ public AbfsRestOperation append(final String path, final byte[] buffer,
if (reqParams.isAppendBlob()
&& appendSuccessCheckOp(op, path,
(reqParams.getPosition() + reqParams.getLength()), tracingContext)) {
- final AbfsRestOperation successOp = new AbfsRestOperation(
- AbfsRestOperationType.Append,
- this,
- HTTP_METHOD_PUT,
- url,
- requestHeaders,
- buffer,
- reqParams.getoffset(),
- reqParams.getLength(),
- sasTokenForReuse);
+ final AbfsRestOperation successOp = getAbfsRestOperationForAppend(
+ AbfsRestOperationType.Append,
+ HTTP_METHOD_PUT,
+ url,
+ requestHeaders,
+ buffer,
+ reqParams.getoffset(),
+ reqParams.getLength(),
+ sasTokenForReuse);
successOp.hardSetResult(HttpURLConnection.HTTP_OK);
return successOp;
}
@@ -706,7 +729,9 @@ public AbfsRestOperation append(final String blockId, final String path, final b
if (reqParams.getLeaseId() != null) {
requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, reqParams.getLeaseId()));
}
-
+ if (reqParams.isExpectHeaderEnabled()) {
+ requestHeaders.add(new AbfsHttpHeader(EXPECT, HUNDRED_CONTINUE));
+ }
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
abfsUriQueryBuilder.addQuery(QUERY_PARAM_COMP, BLOCK);
abfsUriQueryBuilder.addQuery(QUERY_PARAM_BLOCKID, blockId);
@@ -728,7 +753,29 @@ public AbfsRestOperation append(final String blockId, final String path, final b
reqParams.getoffset(),
reqParams.getLength(),
sasTokenForReuse);
- op.execute(tracingContext);
+ try {
+ op.execute(tracingContext);
+ } catch (AzureBlobFileSystemException e) {
+ /*
+ If the http response code indicates a user error we retry
+ the same append request with expect header being disabled.
+ When "100-continue" header is enabled but a non Http 100 response comes,
+ the response message might not get set correctly by the server.
+ So, this handling is to avoid breaking of backward compatibility
+ if someone has taken dependency on the exception message,
+ which is created using the error string present in the response header.
+ */
+ int responseStatusCode = ((AbfsRestOperationException) e).getStatusCode();
+ if (checkUserErrorBlob(responseStatusCode) && reqParams.isExpectHeaderEnabled()) {
+ LOG.debug("User error, retrying without 100 continue enabled for the given path {}", path);
+ reqParams.setExpectHeaderEnabled(false);
+ return this.append(blockId, path, buffer, reqParams, cachedSasToken,
+ tracingContext, eTag);
+ }
+ else {
+ throw e;
+ }
+ }
return op;
}
@@ -776,6 +823,61 @@ public AbfsRestOperation flush(byte[] buffer, final String path, boolean isClose
return op;
}
+ /*
+ * Returns the rest operation for append.
+ * @param operationType The AbfsRestOperationType.
+ * @param httpMethod specifies the httpMethod.
+ * @param url specifies the url.
+ * @param requestHeaders This includes the list of request headers.
+ * @param buffer The buffer to write into.
+ * @param bufferOffset The buffer offset.
+ * @param bufferLength The buffer Length.
+ * @param sasTokenForReuse The sasToken.
+ * @return AbfsRestOperation op.
+ */
+ @VisibleForTesting
+ AbfsRestOperation getAbfsRestOperationForAppend(final AbfsRestOperationType operationType,
+ final String httpMethod,
+ final URL url,
+ final List requestHeaders,
+ final byte[] buffer,
+ final int bufferOffset,
+ final int bufferLength,
+ final String sasTokenForReuse) {
+ return new AbfsRestOperation(
+ operationType,
+ this,
+ httpMethod,
+ url,
+ requestHeaders,
+ buffer,
+ bufferOffset,
+ bufferLength, sasTokenForReuse);
+ }
+
+ /**
+ * Returns true if the status code lies in the range of user error.
+ * @param responseStatusCode http response status code.
+ * @return True or False.
+ */
+ private boolean checkUserError(int responseStatusCode) {
+ return (responseStatusCode >= HttpURLConnection.HTTP_BAD_REQUEST
+ && responseStatusCode < HttpURLConnection.HTTP_INTERNAL_ERROR);
+ }
+
+ /**
+ * Returns true if the status code lies in the range of user error.
+ * In the case of HTTP_CONFLICT for PutBlockList we fallback to DFS and hence
+ * this retry handling is not needed.
+ * @param responseStatusCode http response status code.
+ * @return True or False.
+ */
+ private boolean checkUserErrorBlob(int responseStatusCode) {
+ return (responseStatusCode >= HttpURLConnection.HTTP_BAD_REQUEST
+ && responseStatusCode < HttpURLConnection.HTTP_INTERNAL_ERROR
+ && responseStatusCode != HttpURLConnection.HTTP_CONFLICT);
+ }
+
// For AppendBlob its possible that the append succeeded in the backend but the request failed.
// However a retry would fail with an InvalidQueryParameterValue
// (as the current offset would be unacceptable).
@@ -1651,4 +1753,9 @@ public void addCallback(ListenableFuture future, FutureCallback callba
AbfsConfiguration getAbfsConfiguration() {
return abfsConfiguration;
}
+
+ @VisibleForTesting
+ protected AccessTokenProvider getTokenProvider() {
+ return tokenProvider;
+ }
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java
index a55c924dd8152..2060de6f14a97 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java
@@ -20,27 +20,30 @@
import java.util.Timer;
import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
-import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.util.Preconditions;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.hadoop.util.Time.now;
+
class AbfsClientThrottlingAnalyzer {
private static final Logger LOG = LoggerFactory.getLogger(
- AbfsClientThrottlingAnalyzer.class);
- private static final int DEFAULT_ANALYSIS_PERIOD_MS = 10 * 1000;
+ AbfsClientThrottlingAnalyzer.class);
private static final int MIN_ANALYSIS_PERIOD_MS = 1000;
private static final int MAX_ANALYSIS_PERIOD_MS = 30000;
private static final double MIN_ACCEPTABLE_ERROR_PERCENTAGE = .1;
private static final double MAX_EQUILIBRIUM_ERROR_PERCENTAGE = 1;
private static final double RAPID_SLEEP_DECREASE_FACTOR = .75;
private static final double RAPID_SLEEP_DECREASE_TRANSITION_PERIOD_MS = 150
- * 1000;
+ * 1000;
private static final double SLEEP_DECREASE_FACTOR = .975;
private static final double SLEEP_INCREASE_FACTOR = 1.05;
private int analysisPeriodMs;
@@ -50,49 +53,86 @@ class AbfsClientThrottlingAnalyzer {
private String name = null;
private Timer timer = null;
private AtomicReference blobMetrics = null;
+ private AtomicLong lastExecutionTime = null;
+ private final AtomicBoolean isOperationOnAccountIdle = new AtomicBoolean(false);
+ private AbfsConfiguration abfsConfiguration = null;
+ private boolean accountLevelThrottlingEnabled = true;
private AbfsClientThrottlingAnalyzer() {
// hide default constructor
}
- /**
- * Creates an instance of the AbfsClientThrottlingAnalyzer class with
- * the specified name.
- *
- * @param name a name used to identify this instance.
- * @throws IllegalArgumentException if name is null or empty.
- */
- AbfsClientThrottlingAnalyzer(String name) throws IllegalArgumentException {
- this(name, DEFAULT_ANALYSIS_PERIOD_MS);
- }
-
/**
* Creates an instance of the AbfsClientThrottlingAnalyzer class with
* the specified name and period.
*
* @param name A name used to identify this instance.
- * @param period The frequency, in milliseconds, at which metrics are
- * analyzed.
+ * @param abfsConfiguration The configuration set.
* @throws IllegalArgumentException If name is null or empty.
* If period is less than 1000 or greater than 30000 milliseconds.
*/
- AbfsClientThrottlingAnalyzer(String name, int period)
- throws IllegalArgumentException {
+ AbfsClientThrottlingAnalyzer(String name, AbfsConfiguration abfsConfiguration)
+ throws IllegalArgumentException {
Preconditions.checkArgument(
- StringUtils.isNotEmpty(name),
- "The argument 'name' cannot be null or empty.");
+ StringUtils.isNotEmpty(name),
+ "The argument 'name' cannot be null or empty.");
+ int period = abfsConfiguration.getAnalysisPeriod();
Preconditions.checkArgument(
- period >= MIN_ANALYSIS_PERIOD_MS && period <= MAX_ANALYSIS_PERIOD_MS,
- "The argument 'period' must be between 1000 and 30000.");
+ period >= MIN_ANALYSIS_PERIOD_MS && period <= MAX_ANALYSIS_PERIOD_MS,
+ "The argument 'period' must be between 1000 and 30000.");
this.name = name;
- this.analysisPeriodMs = period;
+ this.abfsConfiguration = abfsConfiguration;
+ this.accountLevelThrottlingEnabled = abfsConfiguration.accountThrottlingEnabled();
+ this.analysisPeriodMs = abfsConfiguration.getAnalysisPeriod();
+ this.lastExecutionTime = new AtomicLong(now());
this.blobMetrics = new AtomicReference(
- new AbfsOperationMetrics(System.currentTimeMillis()));
+ new AbfsOperationMetrics(System.currentTimeMillis()));
this.timer = new Timer(
- String.format("abfs-timer-client-throttling-analyzer-%s", name), true);
+ String.format("abfs-timer-client-throttling-analyzer-%s", name), true);
this.timer.schedule(new TimerTaskImpl(),
- analysisPeriodMs,
- analysisPeriodMs);
+ analysisPeriodMs,
+ analysisPeriodMs);
+ }
+
+ /**
+ * Resumes the timer if it was stopped.
+ */
+ private void resumeTimer() {
+ blobMetrics = new AtomicReference(
+ new AbfsOperationMetrics(System.currentTimeMillis()));
+ timer.schedule(new TimerTaskImpl(),
+ analysisPeriodMs,
+ analysisPeriodMs);
+ isOperationOnAccountIdle.set(false);
+ }
+
+ /**
+ * Synchronized method to suspend or resume timer.
+ * @param timerFunctionality resume or suspend.
+ * @param timerTask The timertask object.
+ * @return true or false.
+ */
+ private synchronized boolean timerOrchestrator(TimerFunctionality timerFunctionality,
+ TimerTask timerTask) {
+ switch (timerFunctionality) {
+ case RESUME:
+ if (isOperationOnAccountIdle.get()) {
+ resumeTimer();
+ }
+ break;
+ case SUSPEND:
+ if (accountLevelThrottlingEnabled && (System.currentTimeMillis()
+ - lastExecutionTime.get() >= getOperationIdleTimeout())) {
+ isOperationOnAccountIdle.set(true);
+ timerTask.cancel();
+ timer.purge();
+ return true;
+ }
+ break;
+ default:
+ break;
+ }
+ return false;
}
/**
@@ -104,12 +144,13 @@ private AbfsClientThrottlingAnalyzer() {
public void addBytesTransferred(long count, boolean isFailedOperation) {
AbfsOperationMetrics metrics = blobMetrics.get();
if (isFailedOperation) {
- metrics.bytesFailed.addAndGet(count);
- metrics.operationsFailed.incrementAndGet();
+ metrics.addBytesFailed(count);
+ metrics.incrementOperationsFailed();
} else {
- metrics.bytesSuccessful.addAndGet(count);
- metrics.operationsSuccessful.incrementAndGet();
+ metrics.addBytesSuccessful(count);
+ metrics.incrementOperationsSuccessful();
}
+ blobMetrics.set(metrics);
}
/**
@@ -117,6 +158,8 @@ public void addBytesTransferred(long count, boolean isFailedOperation) {
* @return true if Thread sleeps(Throttling occurs) else false.
*/
public boolean suspendIfNecessary() {
+ lastExecutionTime.set(now());
+ timerOrchestrator(TimerFunctionality.RESUME, null);
int duration = sleepDuration;
if (duration > 0) {
try {
@@ -134,19 +177,27 @@ int getSleepDuration() {
return sleepDuration;
}
+ int getOperationIdleTimeout() {
+ return abfsConfiguration.getAccountOperationIdleTimeout();
+ }
+
+ AtomicBoolean getIsOperationOnAccountIdle() {
+ return isOperationOnAccountIdle;
+ }
+
private int analyzeMetricsAndUpdateSleepDuration(AbfsOperationMetrics metrics,
int sleepDuration) {
final double percentageConversionFactor = 100;
- double bytesFailed = metrics.bytesFailed.get();
- double bytesSuccessful = metrics.bytesSuccessful.get();
- double operationsFailed = metrics.operationsFailed.get();
- double operationsSuccessful = metrics.operationsSuccessful.get();
+ double bytesFailed = metrics.getBytesFailed().get();
+ double bytesSuccessful = metrics.getBytesSuccessful().get();
+ double operationsFailed = metrics.getOperationsFailed().get();
+ double operationsSuccessful = metrics.getOperationsSuccessful().get();
double errorPercentage = (bytesFailed <= 0)
- ? 0
- : (percentageConversionFactor
- * bytesFailed
- / (bytesFailed + bytesSuccessful));
- long periodMs = metrics.endTime - metrics.startTime;
+ ? 0
+ : (percentageConversionFactor
+ * bytesFailed
+ / (bytesFailed + bytesSuccessful));
+ long periodMs = metrics.getEndTime() - metrics.getStartTime();
double newSleepDuration;
@@ -154,10 +205,10 @@ private int analyzeMetricsAndUpdateSleepDuration(AbfsOperationMetrics metrics,
++consecutiveNoErrorCount;
// Decrease sleepDuration in order to increase throughput.
double reductionFactor =
- (consecutiveNoErrorCount * analysisPeriodMs
- >= RAPID_SLEEP_DECREASE_TRANSITION_PERIOD_MS)
- ? RAPID_SLEEP_DECREASE_FACTOR
- : SLEEP_DECREASE_FACTOR;
+ (consecutiveNoErrorCount * analysisPeriodMs
+ >= RAPID_SLEEP_DECREASE_TRANSITION_PERIOD_MS)
+ ? RAPID_SLEEP_DECREASE_FACTOR
+ : SLEEP_DECREASE_FACTOR;
newSleepDuration = sleepDuration * reductionFactor;
} else if (errorPercentage < MAX_EQUILIBRIUM_ERROR_PERCENTAGE) {
@@ -176,15 +227,15 @@ private int analyzeMetricsAndUpdateSleepDuration(AbfsOperationMetrics metrics,
double additionalDelayNeeded = 5 * analysisPeriodMs;
if (bytesSuccessful > 0) {
additionalDelayNeeded = (bytesSuccessful + bytesFailed)
- * periodMs
- / bytesSuccessful
- - periodMs;
+ * periodMs
+ / bytesSuccessful
+ - periodMs;
}
// amortize the additional delay needed across the estimated number of
// requests during the next period
newSleepDuration = additionalDelayNeeded
- / (operationsFailed + operationsSuccessful);
+ / (operationsFailed + operationsSuccessful);
final double maxSleepDuration = analysisPeriodMs;
final double minSleepDuration = sleepDuration * SLEEP_INCREASE_FACTOR;
@@ -201,16 +252,16 @@ private int analyzeMetricsAndUpdateSleepDuration(AbfsOperationMetrics metrics,
if (LOG.isDebugEnabled()) {
LOG.debug(String.format(
- "%5.5s, %10d, %10d, %10d, %10d, %6.2f, %5d, %5d, %5d",
- name,
- (int) bytesFailed,
- (int) bytesSuccessful,
- (int) operationsFailed,
- (int) operationsSuccessful,
- errorPercentage,
- periodMs,
- (int) sleepDuration,
- (int) newSleepDuration));
+ "%5.5s, %10d, %10d, %10d, %10d, %6.2f, %5d, %5d, %5d",
+ name,
+ (int) bytesFailed,
+ (int) bytesSuccessful,
+ (int) operationsFailed,
+ (int) operationsSuccessful,
+ errorPercentage,
+ periodMs,
+ (int) sleepDuration,
+ (int) newSleepDuration));
}
return (int) newSleepDuration;
@@ -238,12 +289,15 @@ public void run() {
}
long now = System.currentTimeMillis();
- if (now - blobMetrics.get().startTime >= analysisPeriodMs) {
+ if (timerOrchestrator(TimerFunctionality.SUSPEND, this)) {
+ return;
+ }
+ if (now - blobMetrics.get().getStartTime() >= analysisPeriodMs) {
AbfsOperationMetrics oldMetrics = blobMetrics.getAndSet(
- new AbfsOperationMetrics(now));
- oldMetrics.endTime = now;
+ new AbfsOperationMetrics(now));
+ oldMetrics.setEndTime(now);
sleepDuration = analyzeMetricsAndUpdateSleepDuration(oldMetrics,
- sleepDuration);
+ sleepDuration);
}
} finally {
if (doWork) {
@@ -252,24 +306,4 @@ public void run() {
}
}
}
-
- /**
- * Stores Abfs operation metrics during each analysis period.
- */
- static class AbfsOperationMetrics {
- private AtomicLong bytesFailed;
- private AtomicLong bytesSuccessful;
- private AtomicLong operationsFailed;
- private AtomicLong operationsSuccessful;
- private long endTime;
- private long startTime;
-
- AbfsOperationMetrics(long startTime) {
- this.startTime = startTime;
- this.bytesFailed = new AtomicLong();
- this.bytesSuccessful = new AtomicLong();
- this.operationsFailed = new AtomicLong();
- this.operationsSuccessful = new AtomicLong();
- }
- }
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java
index 7303e833418db..3bb225d4be862 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java
@@ -19,13 +19,17 @@
package org.apache.hadoop.fs.azurebfs.services;
import java.net.HttpURLConnection;
+import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.fs.azurebfs.AbfsStatistic;
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+import static java.net.HttpURLConnection.HTTP_UNAVAILABLE;
+
/**
* Throttles Azure Blob File System read and write operations to achieve maximum
* throughput by minimizing errors. The errors occur when the account ingress
@@ -38,35 +42,101 @@
* and sleeps just enough to minimize errors, allowing optimal ingress and/or
* egress throughput.
*/
-public final class AbfsClientThrottlingIntercept {
+public final class AbfsClientThrottlingIntercept implements AbfsThrottlingIntercept {
private static final Logger LOG = LoggerFactory.getLogger(
AbfsClientThrottlingIntercept.class);
private static final String RANGE_PREFIX = "bytes=";
- private static AbfsClientThrottlingIntercept singleton = null;
- private AbfsClientThrottlingAnalyzer readThrottler = null;
- private AbfsClientThrottlingAnalyzer writeThrottler = null;
- private static boolean isAutoThrottlingEnabled = false;
+ private static AbfsClientThrottlingIntercept singleton; // singleton, initialized in static initialization block
+ private static final ReentrantLock LOCK = new ReentrantLock();
+ private final AbfsClientThrottlingAnalyzer readThrottler;
+ private final AbfsClientThrottlingAnalyzer writeThrottler;
+ private final String accountName;
// Hide default constructor
- private AbfsClientThrottlingIntercept() {
- readThrottler = new AbfsClientThrottlingAnalyzer("read");
- writeThrottler = new AbfsClientThrottlingAnalyzer("write");
+ public AbfsClientThrottlingIntercept(String accountName, AbfsConfiguration abfsConfiguration) {
+ this.accountName = accountName;
+ this.readThrottler = setAnalyzer("read " + accountName, abfsConfiguration);
+ this.writeThrottler = setAnalyzer("write " + accountName, abfsConfiguration);
+ LOG.debug("Client-side throttling is enabled for the ABFS file system for the account : {}", accountName);
}
- public static synchronized void initializeSingleton(boolean enableAutoThrottling) {
- if (!enableAutoThrottling) {
- return;
- }
+ // Hide default constructor
+ private AbfsClientThrottlingIntercept(AbfsConfiguration abfsConfiguration) {
+ // Account name is kept as empty as same instance is shared across all accounts.
+ this.accountName = "";
+ this.readThrottler = setAnalyzer("read", abfsConfiguration);
+ this.writeThrottler = setAnalyzer("write", abfsConfiguration);
+ LOG.debug("Client-side throttling is enabled for the ABFS file system using singleton intercept");
+ }
+
+ /**
+ * Sets the analyzer for the intercept.
+ * @param name Name of the analyzer.
+ * @param abfsConfiguration The configuration.
+ * @return AbfsClientThrottlingAnalyzer instance.
+ */
+ private AbfsClientThrottlingAnalyzer setAnalyzer(String name, AbfsConfiguration abfsConfiguration) {
+ return new AbfsClientThrottlingAnalyzer(name, abfsConfiguration);
+ }
+
+ /**
+ * Returns the analyzer for read operations.
+ * @return AbfsClientThrottlingAnalyzer for read.
+ */
+ AbfsClientThrottlingAnalyzer getReadThrottler() {
+ return readThrottler;
+ }
+
+ /**
+ * Returns the analyzer for write operations.
+ * @return AbfsClientThrottlingAnalyzer for write.
+ */
+ AbfsClientThrottlingAnalyzer getWriteThrottler() {
+ return writeThrottler;
+ }
+
+ /**
+ * Creates a singleton object of the AbfsClientThrottlingIntercept.
+ * which is shared across all filesystem instances.
+ * @param abfsConfiguration configuration set.
+ * @return singleton object of intercept.
+ */
+ static AbfsClientThrottlingIntercept initializeSingleton(AbfsConfiguration abfsConfiguration) {
if (singleton == null) {
- singleton = new AbfsClientThrottlingIntercept();
- isAutoThrottlingEnabled = true;
- LOG.debug("Client-side throttling is enabled for the ABFS file system.");
+ LOCK.lock();
+ try {
+ if (singleton == null) {
+ singleton = new AbfsClientThrottlingIntercept(abfsConfiguration);
+ LOG.debug("Client-side throttling is enabled for the ABFS file system.");
+ }
+ } finally {
+ LOCK.unlock();
+ }
}
+ return singleton;
+ }
+
+ /**
+ * Updates the metrics for the case when response code signifies throttling
+ * but there are some expected bytes to be sent.
+ * @param isThrottledOperation returns true if status code is HTTP_UNAVAILABLE
+ * @param abfsHttpOperation Used for status code and data transferred.
+ * @return true if the operation is throttled and has some bytes to transfer.
+ */
+ private boolean updateBytesTransferred(boolean isThrottledOperation,
+ AbfsHttpOperation abfsHttpOperation) {
+ return isThrottledOperation && abfsHttpOperation.getExpectedBytesToBeSent() > 0;
}
- static void updateMetrics(AbfsRestOperationType operationType,
- AbfsHttpOperation abfsHttpOperation) {
- if (!isAutoThrottlingEnabled || abfsHttpOperation == null) {
+ /**
+ * Updates the metrics for successful and failed read and write operations.
+ * @param operationType Only applicable for read and write operations.
+ * @param abfsHttpOperation Used for status code and data transferred.
+ */
+ @Override
+ public void updateMetrics(AbfsRestOperationType operationType,
+ AbfsHttpOperation abfsHttpOperation) {
+ if (abfsHttpOperation == null) {
return;
}
@@ -78,11 +148,24 @@ static void updateMetrics(AbfsRestOperationType operationType,
boolean isFailedOperation = (status < HttpURLConnection.HTTP_OK
|| status >= HttpURLConnection.HTTP_INTERNAL_ERROR);
+ // If status code is 503, it is considered as a throttled operation.
+ boolean isThrottledOperation = (status == HTTP_UNAVAILABLE);
+
switch (operationType) {
case Append:
contentLength = abfsHttpOperation.getBytesSent();
+ if (contentLength == 0) {
+ /*
+ Signifies the case where we could not update the bytesSent due to
+ throttling but there were some expectedBytesToBeSent.
+ */
+ if (updateBytesTransferred(isThrottledOperation, abfsHttpOperation)) {
+ LOG.debug("Updating metrics due to throttling for path {}", abfsHttpOperation.getConnUrl().getPath());
+ contentLength = abfsHttpOperation.getExpectedBytesToBeSent();
+ }
+ }
if (contentLength > 0) {
- singleton.writeThrottler.addBytesTransferred(contentLength,
+ writeThrottler.addBytesTransferred(contentLength,
isFailedOperation);
}
break;
@@ -90,7 +173,7 @@ static void updateMetrics(AbfsRestOperationType operationType,
String range = abfsHttpOperation.getConnection().getRequestProperty(HttpHeaderConfigurations.RANGE);
contentLength = getContentLengthIfKnown(range);
if (contentLength > 0) {
- singleton.readThrottler.addBytesTransferred(contentLength,
+ readThrottler.addBytesTransferred(contentLength,
isFailedOperation);
}
break;
@@ -104,21 +187,18 @@ static void updateMetrics(AbfsRestOperationType operationType,
* uses this to suspend the request, if necessary, to minimize errors and
* maximize throughput.
*/
- static void sendingRequest(AbfsRestOperationType operationType,
+ @Override
+ public void sendingRequest(AbfsRestOperationType operationType,
AbfsCounters abfsCounters) {
- if (!isAutoThrottlingEnabled) {
- return;
- }
-
switch (operationType) {
case ReadFile:
- if (singleton.readThrottler.suspendIfNecessary()
+ if (readThrottler.suspendIfNecessary()
&& abfsCounters != null) {
abfsCounters.incrementCounter(AbfsStatistic.READ_THROTTLES, 1);
}
break;
case Append:
- if (singleton.writeThrottler.suspendIfNecessary()
+ if (writeThrottler.suspendIfNecessary()
&& abfsCounters != null) {
abfsCounters.incrementCounter(AbfsStatistic.WRITE_THROTTLES, 1);
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java
index 63560883d70be..7193c20137c18 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java
@@ -66,6 +66,9 @@
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_COMP;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COMP_LIST;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE;
+import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.EXPECT;
+
/**
* Represents an HTTP operation.
*/
@@ -96,6 +99,7 @@ public class AbfsHttpOperation implements AbfsPerfLoggable {
// metrics
private int bytesSent;
+ private int expectedBytesToBeSent;
private long bytesReceived;
// optional trace enabled metrics
@@ -200,6 +204,10 @@ public int getBytesSent() {
return bytesSent;
}
+ public int getExpectedBytesToBeSent() {
+ return expectedBytesToBeSent;
+ }
+
public long getBytesReceived() {
return bytesReceived;
}
@@ -339,7 +347,7 @@ public AbfsHttpOperation(final URL url, final String method, final List= HttpURLConnection.HTTP_BAD_REQUEST) {
+ int status = result.getStatusCode();
+ /*
+ If even after exhausting all retries, the http status code has an
+ invalid value it qualifies for InvalidAbfsRestOperationException.
+ All http status code less than 1xx range are considered as invalid
+ status codes.
+ */
+ if (status < HTTP_CONTINUE) {
+ throw new InvalidAbfsRestOperationException(null, retryCount);
+ }
+
+ if (status >= HttpURLConnection.HTTP_BAD_REQUEST) {
throw new AbfsRestOperationException(result.getStatusCode(), result.getStorageErrorCode(),
result.getStorageErrorMessage(), null, result);
}
-
LOG.trace("{} REST operation complete", operationType);
}
@@ -278,10 +291,10 @@ public void signRequest(final AbfsHttpOperation httpOperation,
*/
private boolean executeHttpOperation(final int retryCount,
TracingContext tracingContext) throws AzureBlobFileSystemException {
- AbfsHttpOperation httpOperation = null;
+ AbfsHttpOperation httpOperation;
try {
// initialize the HTTP request and open the connection
- httpOperation = createNewHttpOperation();
+ httpOperation = createHttpOperation();
incrementCounter(AbfsStatistic.CONNECTIONS_MADE, 1);
tracingContext.constructHeader(httpOperation);
@@ -296,8 +309,7 @@ private boolean executeHttpOperation(final int retryCount,
// dump the headers
AbfsIoUtils.dumpHeadersToDebugLog("Request Headers",
httpOperation.getConnection().getRequestProperties());
- AbfsClientThrottlingIntercept.sendingRequest(operationType, abfsCounters);
-
+ intercept.sendingRequest(operationType, abfsCounters);
if (hasRequestBody) {
// HttpUrlConnection requires
httpOperation.sendRequest(buffer, bufferOffset, bufferLength);
@@ -323,7 +335,7 @@ private boolean executeHttpOperation(final int retryCount,
LOG.warn("Unknown host name: %s. Retrying to resolve the host name...",
hostname);
if (!client.getRetryPolicy().shouldRetry(retryCount, -1)) {
- throw new InvalidAbfsRestOperationException(ex);
+ throw new InvalidAbfsRestOperationException(ex, retryCount);
}
return false;
} catch (IOException ex) {
@@ -332,12 +344,25 @@ private boolean executeHttpOperation(final int retryCount,
}
if (!client.getRetryPolicy().shouldRetry(retryCount, -1)) {
- throw new InvalidAbfsRestOperationException(ex);
+ throw new InvalidAbfsRestOperationException(ex, retryCount);
}
return false;
} finally {
- AbfsClientThrottlingIntercept.updateMetrics(operationType, httpOperation);
+ int status = httpOperation.getStatusCode();
+ /*
+ A status less than 300 (2xx range) or greater than or equal
+ to 500 (5xx range) should contribute to throttling metrics being updated.
+ Less than 200 or greater than or equal to 500 show failed operations. 2xx
+ range contributes to successful operations. 3xx range is for redirects
+ and 4xx range is for user errors. These should not be a part of
+ throttling backoff computation.
+ */
+ boolean updateMetricsResponseCode = (status < HttpURLConnection.HTTP_MULT_CHOICE
+ || status >= HttpURLConnection.HTTP_INTERNAL_ERROR);
+ if (updateMetricsResponseCode) {
+ intercept.updateMetrics(operationType, httpOperation);
+ }
}
LOG.debug("HttpRequest: {}: {}", operationType, httpOperation.toString());
@@ -351,11 +376,6 @@ private boolean executeHttpOperation(final int retryCount,
return true;
}
- @VisibleForTesting
- AbfsHttpOperation createNewHttpOperation() throws IOException {
- return new AbfsHttpOperation(url, method, requestHeaders);
- }
-
@VisibleForTesting
String getMethod() {
return method;
@@ -366,6 +386,15 @@ void setResult(AbfsHttpOperation result) {
this.result = result;
}
+ /**
+ * Creates new object of {@link AbfsHttpOperation} with the url, method, and
+ * requestHeaders fields of the AbfsRestOperation object.
+ */
+ @VisibleForTesting
+ AbfsHttpOperation createHttpOperation() throws IOException {
+ return new AbfsHttpOperation(url, method, requestHeaders);
+ }
+
/**
* Incrementing Abfs counters with a long value.
*
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsThrottlingIntercept.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsThrottlingIntercept.java
new file mode 100644
index 0000000000000..0ceb4335fcef4
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsThrottlingIntercept.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * An interface for Abfs Throttling Interface.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface AbfsThrottlingIntercept {
+
+ /**
+ * Updates the metrics for successful and failed read and write operations.
+ * @param operationType Only applicable for read and write operations.
+ * @param abfsHttpOperation Used for status code and data transferred.
+ */
+ void updateMetrics(AbfsRestOperationType operationType,
+ AbfsHttpOperation abfsHttpOperation);
+
+ /**
+ * Called before the request is sent. Client-side throttling
+ * uses this to suspend the request, if necessary, to minimize errors and
+ * maximize throughput.
+ * @param operationType Only applicable for read and write operations.
+ * @param abfsCounters Used for counters.
+ */
+ void sendingRequest(AbfsRestOperationType operationType,
+ AbfsCounters abfsCounters);
+
+}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsThrottlingInterceptFactory.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsThrottlingInterceptFactory.java
new file mode 100644
index 0000000000000..14f2b8db437bc
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsThrottlingInterceptFactory.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.util.WeakReferenceMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class to get an instance of throttling intercept class per account.
+ */
+final class AbfsThrottlingInterceptFactory {
+
+ private AbfsThrottlingInterceptFactory() {
+ }
+
+ private static AbfsConfiguration abfsConfig;
+
+ /**
+ * List of references notified of loss.
+ */
+ private static List lostReferences = new ArrayList<>();
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ AbfsThrottlingInterceptFactory.class);
+
+ /**
+ * Map which stores instance of ThrottlingIntercept class per account.
+ */
+ private static WeakReferenceMap
+ interceptMap = new WeakReferenceMap<>(
+ AbfsThrottlingInterceptFactory::factory,
+ AbfsThrottlingInterceptFactory::referenceLost);
+
+ /**
+ * Returns instance of throttling intercept.
+ * @param accountName Account name.
+ * @return instance of throttling intercept.
+ */
+ private static AbfsClientThrottlingIntercept factory(final String accountName) {
+ return new AbfsClientThrottlingIntercept(accountName, abfsConfig);
+ }
+
+ /**
+ * Reference lost callback.
+ * @param accountName key lost.
+ */
+ private static void referenceLost(String accountName) {
+ lostReferences.add(accountName);
+ }
+
+ /**
+ * Returns an instance of AbfsThrottlingIntercept.
+ *
+ * @param accountName The account for which we need instance of throttling intercept.
+ @param abfsConfiguration The object of abfsconfiguration class.
+ * @return Instance of AbfsThrottlingIntercept.
+ */
+ static synchronized AbfsThrottlingIntercept getInstance(String accountName,
+ AbfsConfiguration abfsConfiguration) {
+ abfsConfig = abfsConfiguration;
+ AbfsThrottlingIntercept intercept;
+ if (!abfsConfiguration.isAutoThrottlingEnabled()) {
+ return AbfsNoOpThrottlingIntercept.INSTANCE;
+ }
+ // If singleton is enabled use a static instance of the intercept class for all accounts
+ if (!abfsConfiguration.accountThrottlingEnabled()) {
+ intercept = AbfsClientThrottlingIntercept.initializeSingleton(
+ abfsConfiguration);
+ } else {
+ // Return the instance from the map
+ intercept = interceptMap.get(accountName);
+ if (intercept == null) {
+ intercept = new AbfsClientThrottlingIntercept(accountName,
+ abfsConfiguration);
+ interceptMap.put(accountName, intercept);
+ }
+ }
+ return intercept;
+ }
+}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java
index 218eecaa45a40..dee1d374d4a04 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java
@@ -24,6 +24,8 @@
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_CONTINUE;
+
/**
* Retry policy used by AbfsClient.
* */
@@ -138,7 +140,7 @@ public ExponentialRetryPolicy(final int retryCount, final int minBackoff, final
*/
public boolean shouldRetry(final int retryCount, final int statusCode) {
return retryCount < this.retryCount
- && (statusCode == -1
+ && (statusCode < HTTP_CONTINUE
|| statusCode == HttpURLConnection.HTTP_CLIENT_TIMEOUT
|| statusCode == HttpURLConnection.HTTP_GONE
|| statusCode == HTTP_TOO_MANY_REQUESTS
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/TimerFunctionality.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/TimerFunctionality.java
new file mode 100644
index 0000000000000..52428fdd54a19
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/TimerFunctionality.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+public enum TimerFunctionality {
+ RESUME,
+
+ SUSPEND
+}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java
index 613274b6d0374..241232ed917dc 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java
@@ -140,6 +140,10 @@ public void setOperation(FSOperationType operation) {
this.opType = operation;
}
+ public int getRetryCount() {
+ return retryCount;
+ }
+
public void setRetryCount(int retryCount) {
this.retryCount = retryCount;
}
diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md
index dfb7f3f42a5cf..ac77765f9e0b5 100644
--- a/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md
+++ b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md
@@ -769,6 +769,26 @@ Hflush() being the only documented API that can provide persistent data
transfer, Flush() also attempting to persist buffered data will lead to
performance issues.
+<<<<<<< HEAD
+=======
+### Hundred Continue Options
+
+`fs.azure.account.expect.header.enabled`: This configuration parameter is used
+to specify whether you wish to send a expect 100 continue header with each
+append request or not. It is configured to true by default. This flag configures
+the client to check with the Azure store before uploading a block of data from
+an output stream. This allows the client to throttle back gracefully -before
+actually attempting to upload the block. In experiments this provides
+significant throughput improvements under heavy load. For more information :
+- https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Expect
+
+
+### Account level throttling Options
+
+`fs.azure.account.operation.idle.timeout`: This value specifies the time after which the timer for the analyzer (read or
+write) should be paused until no new request is made again. The default value for the same is 60 seconds.
+
+>>>>>>> c88011c6046... HADOOP-18146: ABFS: Added changes for expect hundred continue header (#4039)
### HNS Check Options
Config `fs.azure.account.hns.enabled` provides an option to specify whether
the storage account is HNS enabled or not. In case the config is not provided,
@@ -874,6 +894,9 @@ when there are too many writes from the same process.
time. Effectively this will be the threadpool size within the
AbfsOutputStream instance. Set the value in between 1 to 8 both inclusive.
+`fs.azure.analysis.period`: The time after which sleep duration is recomputed after analyzing metrics. The default value
+for the same is 10 seconds.
+
`fs.azure.write.max.requests.to.queue`: To set the maximum write requests
that can be queued. Memory consumption of AbfsOutputStream instance can be
tuned with this config considering each queued request holds a buffer. Set
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java
index d6d3742ec8070..cca2a94add2bc 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java
@@ -25,6 +25,7 @@
import java.util.UUID;
import java.util.concurrent.Callable;
+import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.PrefixMode;
import org.junit.After;
@@ -42,6 +43,7 @@
import org.apache.hadoop.fs.azurebfs.security.AbfsDelegationTokenManager;
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
import org.apache.hadoop.fs.azurebfs.services.AuthType;
+import org.apache.hadoop.fs.azurebfs.services.ITestAbfsClient;
import org.apache.hadoop.fs.azure.AzureNativeFileSystemStore;
import org.apache.hadoop.fs.azure.NativeAzureFileSystem;
import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation;
@@ -73,7 +75,7 @@ public abstract class AbstractAbfsIntegrationTest extends
AbstractAbfsTestWithTimeout {
private static final Logger LOG =
- LoggerFactory.getLogger(AbstractAbfsIntegrationTest.class);
+ LoggerFactory.getLogger(AbstractAbfsIntegrationTest.class);
private boolean isIPAddress;
private NativeAzureFileSystem wasb;
@@ -110,7 +112,7 @@ protected AbstractAbfsIntegrationTest() throws Exception {
if (authType == AuthType.SharedKey) {
assumeTrue("Not set: " + FS_AZURE_ACCOUNT_KEY,
- abfsConfig.get(FS_AZURE_ACCOUNT_KEY) != null);
+ abfsConfig.get(FS_AZURE_ACCOUNT_KEY) != null);
// Update credentials
} else {
assumeTrue("Not set: " + FS_AZURE_ACCOUNT_TOKEN_PROVIDER_TYPE_PROPERTY_NAME,
@@ -243,6 +245,9 @@ public Hashtable call() throws Exception {
}
}
+ public AccessTokenProvider getAccessTokenProvider(final AzureBlobFileSystem fs) {
+ return ITestAbfsClient.getAccessTokenProvider(fs.getAbfsStore().getClient());
+ }
public void loadConfiguredFileSystem() throws Exception {
// disable auto-creation of filesystem
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java
index 48bf3cc2b00fe..a6202f54378d4 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java
@@ -39,7 +39,7 @@
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
import org.apache.hadoop.fs.azurebfs.services.OperativeEndpoint;
import org.apache.hadoop.fs.azurebfs.services.PrefixMode;
-import org.apache.hadoop.fs.azurebfs.services.TestAbfsClient;
+import org.apache.hadoop.fs.azurebfs.services.ITestAbfsClient;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.junit.Assume;
import org.junit.Test;
@@ -266,7 +266,7 @@ public void testCreateNonEmptyBlob() throws IOException {
AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
Mockito.doReturn(store).when(fs).getAbfsStore();
AbfsClient client = store.getClient();
- AbfsClient testClient = Mockito.spy(TestAbfsClient.createTestClientFromCurrentContext(
+ AbfsClient testClient = Mockito.spy(ITestAbfsClient.createTestClientFromCurrentContext(
client,
fs.getAbfsStore().getAbfsConfiguration()));
store.setClient(testClient);
@@ -291,7 +291,7 @@ public void testValidateGetBlockList() throws Exception {
AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
Mockito.doReturn(store).when(fs).getAbfsStore();
AbfsClient client = store.getClient();
- AbfsClient testClient = Mockito.spy(TestAbfsClient.createTestClientFromCurrentContext(
+ AbfsClient testClient = Mockito.spy(ITestAbfsClient.createTestClientFromCurrentContext(
client,
fs.getAbfsStore().getAbfsConfiguration()));
store.setClient(testClient);
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java
index b0d0a223af711..f83e605d061e7 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java
@@ -59,7 +59,7 @@
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
-import org.apache.hadoop.fs.azurebfs.services.TestAbfsClient;
+import org.apache.hadoop.fs.azurebfs.services.ITestAbfsClient;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator;
import org.mockito.Mockito;
@@ -1162,7 +1162,7 @@ public void testNegativeScenariosForCreateOverwriteDisabled()
// Get mock AbfsClient with current config
AbfsClient
mockClient
- = TestAbfsClient.getMockAbfsClient(
+ = ITestAbfsClient.getMockAbfsClient(
fs.getAbfsStore().getClient(),
fs.getAbfsStore().getAbfsConfiguration());
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java
index bc18c85a9e711..a2549aa17b511 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java
@@ -38,7 +38,7 @@
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
-import org.apache.hadoop.fs.azurebfs.services.TestAbfsClient;
+import org.apache.hadoop.fs.azurebfs.services.ITestAbfsClient;
import org.apache.hadoop.fs.azurebfs.services.TestAbfsPerfTracker;
import org.apache.hadoop.fs.azurebfs.utils.TestMockHelpers;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
@@ -195,7 +195,7 @@ public void testDeleteIdempotency() throws Exception {
final AzureBlobFileSystem fs = getFileSystem();
AbfsClient abfsClient = fs.getAbfsStore().getClient();
- AbfsClient testClient = TestAbfsClient.createTestClientFromCurrentContext(
+ AbfsClient testClient = ITestAbfsClient.createTestClientFromCurrentContext(
abfsClient,
abfsConfig);
@@ -242,7 +242,7 @@ public void testDeleteIdempotency() throws Exception {
public void testDeleteIdempotencyTriggerHttp404() throws Exception {
final AzureBlobFileSystem fs = getFileSystem();
- AbfsClient client = TestAbfsClient.createTestClientFromCurrentContext(
+ AbfsClient client = ITestAbfsClient.createTestClientFromCurrentContext(
fs.getAbfsStore().getClient(),
this.getConfiguration());
@@ -261,7 +261,7 @@ public void testDeleteIdempotencyTriggerHttp404() throws Exception {
getTestTracingContext(fs, true)));
// mock idempotency check to mimic retried case
- AbfsClient mockClient = TestAbfsClient.getMockAbfsClient(
+ AbfsClient mockClient = ITestAbfsClient.getMockAbfsClient(
fs.getAbfsStore().getClient(),
this.getConfiguration());
AzureBlobFileSystemStore mockStore = mock(AzureBlobFileSystemStore.class);
@@ -276,10 +276,10 @@ public void testDeleteIdempotencyTriggerHttp404() throws Exception {
// Case 2: Mimic retried case
// Idempotency check on Delete always returns success
- AbfsRestOperation idempotencyRetOp = TestAbfsClient.getRestOp(
+ AbfsRestOperation idempotencyRetOp = ITestAbfsClient.getRestOp(
DeletePath, mockClient, HTTP_METHOD_DELETE,
- TestAbfsClient.getTestUrl(mockClient, "/NonExistingPath"),
- TestAbfsClient.getTestRequestHeaders(mockClient));
+ ITestAbfsClient.getTestUrl(mockClient, "/NonExistingPath"),
+ ITestAbfsClient.getTestRequestHeaders(mockClient));
idempotencyRetOp.hardSetResult(HTTP_OK);
doReturn(idempotencyRetOp).when(mockClient).deleteIdempotencyCheckOp(any());
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemExplictImplicitRename.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemExplictImplicitRename.java
index 2234c23412944..d6ec19c6b3066 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemExplictImplicitRename.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemExplictImplicitRename.java
@@ -18,17 +18,12 @@
package org.apache.hadoop.fs.azurebfs;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.HttpURLConnection;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
-import org.apache.hadoop.fs.azurebfs.services.TestAbfsClient;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
-import org.mockito.Mock;
import org.mockito.Mockito;
import org.apache.hadoop.fs.Path;
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemMkDir.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemMkDir.java
index 132317d88cc4a..acdcf66942ab7 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemMkDir.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemMkDir.java
@@ -24,7 +24,7 @@
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.OperativeEndpoint;
import org.apache.hadoop.fs.azurebfs.services.PrefixMode;
-import org.apache.hadoop.fs.azurebfs.services.TestAbfsClient;
+import org.apache.hadoop.fs.azurebfs.services.ITestAbfsClient;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.junit.Assume;
import org.junit.Test;
@@ -165,7 +165,7 @@ public void testVerifyGetBlobProperty() throws Exception {
AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
Mockito.doReturn(store).when(fs).getAbfsStore();
AbfsClient client = store.getClient();
- AbfsClient testClient = Mockito.spy(TestAbfsClient.createTestClientFromCurrentContext(
+ AbfsClient testClient = Mockito.spy(ITestAbfsClient.createTestClientFromCurrentContext(
client,
fs.getAbfsStore().getAbfsConfiguration()));
store.setClient(testClient);
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestCustomerProvidedKey.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestCustomerProvidedKey.java
index 2c0bd31bf8eeb..8ef5f1d451670 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestCustomerProvidedKey.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestCustomerProvidedKey.java
@@ -202,7 +202,7 @@ public void testAppendWithCPK() throws Exception {
// Trying to append with correct CPK headers
AppendRequestParameters appendRequestParameters =
new AppendRequestParameters(
- 0, 0, 5, Mode.APPEND_MODE, false, null);
+ 0, 0, 5, Mode.APPEND_MODE, false, null, true);
byte[] buffer = getRandomBytesArray(5);
AbfsClient abfsClient = fs.getAbfsClient();
AbfsRestOperation abfsRestOperation = abfsClient
@@ -247,7 +247,7 @@ public void testAppendWithoutCPK() throws Exception {
// Trying to append without CPK headers
AppendRequestParameters appendRequestParameters =
new AppendRequestParameters(
- 0, 0, 5, Mode.APPEND_MODE, false, null);
+ 0, 0, 5, Mode.APPEND_MODE, false, null, true);
byte[] buffer = getRandomBytesArray(5);
AbfsClient abfsClient = fs.getAbfsClient();
AbfsRestOperation abfsRestOperation = abfsClient
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/constants/TestConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/constants/TestConfigurationKeys.java
index 565eb38c4f70a..9e40f22d231b0 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/constants/TestConfigurationKeys.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/constants/TestConfigurationKeys.java
@@ -24,6 +24,9 @@
public final class TestConfigurationKeys {
public static final String FS_AZURE_ACCOUNT_NAME = "fs.azure.account.name";
public static final String FS_AZURE_ABFS_ACCOUNT_NAME = "fs.azure.abfs.account.name";
+ public static final String FS_AZURE_ABFS_ACCOUNT1_NAME = "fs.azure.abfs.account1.name";
+ public static final String FS_AZURE_ENABLE_AUTOTHROTTLING = "fs.azure.enable.autothrottling";
+ public static final String FS_AZURE_ANALYSIS_PERIOD = "fs.azure.analysis.period";
public static final String FS_AZURE_ACCOUNT_KEY = "fs.azure.account.key";
public static final String FS_AZURE_CONTRACT_TEST_URI = "fs.contract.test.fs.abfs";
public static final String FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT = "fs.azure.test.namespace.enabled";
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperationTestUtil.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperationTestUtil.java
index d1c661eea3b78..56556b1930566 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperationTestUtil.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperationTestUtil.java
@@ -41,7 +41,7 @@ public static void addAbfsHttpOpProcessResponseMock(final AbfsRestOperation spie
spiedRestOp.getMethod(), spiedRestOp.getRequestHeaders());
AbfsHttpOperation spiedOp = Mockito.spy(op);
return functionRaisingIOE.apply(spiedOp, actualOp);
- }).when(spiedRestOp).createNewHttpOperation();
+ }).when(spiedRestOp).createHttpOperation();
}
public static void setResult(final AbfsRestOperation op, final AbfsHttpOperation result) {
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java
similarity index 62%
rename from hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java
rename to hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java
index 8dfef876561f7..e798a4baa36ab 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java
@@ -20,21 +20,44 @@
import java.io.IOException;
import java.lang.reflect.Field;
+import java.net.HttpURLConnection;
+import java.net.ProtocolException;
import java.net.URL;
import java.util.List;
+import java.util.Random;
import java.util.regex.Pattern;
import org.junit.Ignore;
+import org.assertj.core.api.Assertions;
import org.junit.Test;
+import org.mockito.Mockito;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+import org.apache.hadoop.fs.azurebfs.TestAbfsConfigurationFieldsValidation;
+import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderFormat;
import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
-import static org.assertj.core.api.Assertions.assertThat;
+import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPEND_ACTION;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE;
+import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.EXPECT;
+import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_HTTP_METHOD_OVERRIDE;
+import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_ACTION;
+import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_POSITION;
+import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_ABFS_ACCOUNT_NAME;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -60,14 +83,19 @@
* Test useragent of abfs client.
*
*/
-public final class TestAbfsClient {
+public final class ITestAbfsClient extends AbstractAbfsIntegrationTest {
private static final String ACCOUNT_NAME = "bogusAccountName.dfs.core.windows.net";
private static final String FS_AZURE_USER_AGENT_PREFIX = "Partner Service";
+ private static final String TEST_PATH = "/testfile";
+ public static final int REDUCED_RETRY_COUNT = 2;
+ public static final int REDUCED_BACKOFF_INTERVAL = 100;
+ public static final int BUFFER_LENGTH = 5;
+ public static final int BUFFER_OFFSET = 0;
private final Pattern userAgentStringPattern;
- public TestAbfsClient(){
+ public ITestAbfsClient() throws Exception {
StringBuilder regEx = new StringBuilder();
regEx.append("^");
regEx.append(APN_VERSION);
@@ -125,7 +153,7 @@ public void verifybBasicInfo() throws Exception {
}
private void verifybBasicInfo(String userAgentStr) {
- assertThat(userAgentStr)
+ Assertions.assertThat(userAgentStr)
.describedAs("User-Agent string [" + userAgentStr
+ "] should be of the pattern: " + this.userAgentStringPattern.pattern())
.matches(this.userAgentStringPattern)
@@ -155,7 +183,7 @@ public void verifyUserAgentPrefix()
String userAgentStr = getUserAgentString(abfsConfiguration, false);
verifybBasicInfo(userAgentStr);
- assertThat(userAgentStr)
+ Assertions.assertThat(userAgentStr)
.describedAs("User-Agent string should contain " + FS_AZURE_USER_AGENT_PREFIX)
.contains(FS_AZURE_USER_AGENT_PREFIX);
@@ -165,7 +193,7 @@ public void verifyUserAgentPrefix()
userAgentStr = getUserAgentString(abfsConfiguration, false);
verifybBasicInfo(userAgentStr);
- assertThat(userAgentStr)
+ Assertions.assertThat(userAgentStr)
.describedAs("User-Agent string should not contain " + FS_AZURE_USER_AGENT_PREFIX)
.doesNotContain(FS_AZURE_USER_AGENT_PREFIX);
}
@@ -181,14 +209,14 @@ public void verifyUserAgentWithoutSSLProvider() throws Exception {
String userAgentStr = getUserAgentString(abfsConfiguration, true);
verifybBasicInfo(userAgentStr);
- assertThat(userAgentStr)
+ Assertions.assertThat(userAgentStr)
.describedAs("User-Agent string should contain sslProvider")
.contains(DelegatingSSLSocketFactory.getDefaultFactory().getProviderName());
userAgentStr = getUserAgentString(abfsConfiguration, false);
verifybBasicInfo(userAgentStr);
- assertThat(userAgentStr)
+ Assertions.assertThat(userAgentStr)
.describedAs("User-Agent string should not contain sslProvider")
.doesNotContain(DelegatingSSLSocketFactory.getDefaultFactory().getProviderName());
}
@@ -204,7 +232,7 @@ public void verifyUserAgentClusterName() throws Exception {
String userAgentStr = getUserAgentString(abfsConfiguration, false);
verifybBasicInfo(userAgentStr);
- assertThat(userAgentStr)
+ Assertions.assertThat(userAgentStr)
.describedAs("User-Agent string should contain cluster name")
.contains(clusterName);
@@ -214,7 +242,7 @@ public void verifyUserAgentClusterName() throws Exception {
userAgentStr = getUserAgentString(abfsConfiguration, false);
verifybBasicInfo(userAgentStr);
- assertThat(userAgentStr)
+ Assertions.assertThat(userAgentStr)
.describedAs("User-Agent string should not contain cluster name")
.doesNotContain(clusterName)
.describedAs("User-Agent string should contain UNKNOWN as cluster name config is absent")
@@ -232,7 +260,7 @@ public void verifyUserAgentClusterType() throws Exception {
String userAgentStr = getUserAgentString(abfsConfiguration, false);
verifybBasicInfo(userAgentStr);
- assertThat(userAgentStr)
+ Assertions.assertThat(userAgentStr)
.describedAs("User-Agent string should contain cluster type")
.contains(clusterType);
@@ -242,7 +270,7 @@ public void verifyUserAgentClusterType() throws Exception {
userAgentStr = getUserAgentString(abfsConfiguration, false);
verifybBasicInfo(userAgentStr);
- assertThat(userAgentStr)
+ Assertions.assertThat(userAgentStr)
.describedAs("User-Agent string should not contain cluster type")
.doesNotContain(clusterType)
.describedAs("User-Agent string should contain UNKNOWN as cluster type config is absent")
@@ -308,24 +336,28 @@ public static AbfsClient getMockAbfsClient(AbfsClient baseAbfsClientInstance,
when(client.getAccessToken()).thenCallRealMethod();
when(client.getSharedKeyCredentials()).thenCallRealMethod();
when(client.createDefaultHeaders()).thenCallRealMethod();
-
+ when(client.getAbfsConfiguration()).thenReturn(abfsConfig);
+ when(client.getIntercept()).thenReturn(
+ AbfsThrottlingInterceptFactory.getInstance(
+ abfsConfig.getAccountName().substring(0,
+ abfsConfig.getAccountName().indexOf(DOT)), abfsConfig));
// override baseurl
- client = TestAbfsClient.setAbfsClientField(client, "abfsConfiguration",
+ client = ITestAbfsClient.setAbfsClientField(client, "abfsConfiguration",
abfsConfig);
// override baseurl
- client = TestAbfsClient.setAbfsClientField(client, "baseUrl",
+ client = ITestAbfsClient.setAbfsClientField(client, "baseUrl",
baseAbfsClientInstance.getBaseUrl());
// override auth provider
if (currentAuthType == AuthType.SharedKey) {
- client = TestAbfsClient.setAbfsClientField(client, "sharedKeyCredentials",
+ client = ITestAbfsClient.setAbfsClientField(client, "sharedKeyCredentials",
new SharedKeyCredentials(
abfsConfig.getAccountName().substring(0,
abfsConfig.getAccountName().indexOf(DOT)),
abfsConfig.getStorageAccountKey()));
} else {
- client = TestAbfsClient.setAbfsClientField(client, "tokenProvider",
+ client = ITestAbfsClient.setAbfsClientField(client, "tokenProvider",
abfsConfig.getTokenProvider());
}
@@ -333,7 +365,7 @@ public static AbfsClient getMockAbfsClient(AbfsClient baseAbfsClientInstance,
String userAgent = "APN/1.0 Azure Blob FS/3.4.0-SNAPSHOT (PrivateBuild "
+ "JavaJRE 1.8.0_252; Linux 5.3.0-59-generic/amd64; openssl-1.0; "
+ "UNKNOWN/UNKNOWN) MSFT";
- client = TestAbfsClient.setAbfsClientField(client, "userAgent", userAgent);
+ client = ITestAbfsClient.setAbfsClientField(client, "userAgent", userAgent);
return client;
}
@@ -397,4 +429,160 @@ public static AbfsRestOperation getRestOp(AbfsRestOperationType type,
url,
requestHeaders);
}
+
+ public static AccessTokenProvider getAccessTokenProvider(AbfsClient client) {
+ return client.getTokenProvider();
+ }
+
+ /**
+ * Test helper method to get random bytes array.
+ * @param length The length of byte buffer.
+ * @return byte buffer.
+ */
+ private byte[] getRandomBytesArray(int length) {
+ final byte[] b = new byte[length];
+ new Random().nextBytes(b);
+ return b;
+ }
+
+ /**
+ * Test to verify that client retries append request without
+ * expect header enabled if append with expect header enabled fails
+ * with 4xx kind of error.
+ * @throws Exception
+ */
+ @Test
+ public void testExpectHundredContinue() throws Exception {
+ // Get the filesystem.
+ final AzureBlobFileSystem fs = getFileSystem();
+
+ final Configuration configuration = new Configuration();
+ configuration.addResource(TEST_CONFIGURATION_FILE_NAME);
+ AbfsClient abfsClient = getClient(fs);
+
+ AbfsConfiguration abfsConfiguration = new AbfsConfiguration(configuration,
+ configuration.get(FS_AZURE_ABFS_ACCOUNT_NAME));
+
+ // Update the configuration with reduced retry count and reduced backoff interval.
+ AbfsConfiguration abfsConfig
+ = TestAbfsConfigurationFieldsValidation.updateRetryConfigs(
+ abfsConfiguration,
+ REDUCED_RETRY_COUNT, REDUCED_BACKOFF_INTERVAL);
+
+ // Gets the client.
+ AbfsClient testClient = Mockito.spy(
+ ITestAbfsClient.createTestClientFromCurrentContext(
+ abfsClient,
+ abfsConfig));
+
+ // Create the append request params with expect header enabled initially.
+ AppendRequestParameters appendRequestParameters
+ = new AppendRequestParameters(
+ BUFFER_OFFSET, BUFFER_OFFSET, BUFFER_LENGTH,
+ AppendRequestParameters.Mode.APPEND_MODE, false, null, true);
+
+ byte[] buffer = getRandomBytesArray(BUFFER_LENGTH);
+
+ // Create a test container to upload the data.
+ Path testPath = path(TEST_PATH);
+ fs.create(testPath);
+ String finalTestPath = testPath.toString()
+ .substring(testPath.toString().lastIndexOf("/"));
+
+ // Creates a list of request headers.
+ final List requestHeaders
+ = ITestAbfsClient.getTestRequestHeaders(testClient);
+ requestHeaders.add(
+ new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE, HTTP_METHOD_PATCH));
+ if (appendRequestParameters.isExpectHeaderEnabled()) {
+ requestHeaders.add(new AbfsHttpHeader(EXPECT, HUNDRED_CONTINUE));
+ }
+
+ // Updates the query parameters.
+ final AbfsUriQueryBuilder abfsUriQueryBuilder
+ = testClient.createDefaultUriQueryBuilder();
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, APPEND_ACTION);
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION,
+ Long.toString(appendRequestParameters.getPosition()));
+
+ // Creates the url for the specified path.
+ URL url = testClient.createRequestUrl(finalTestPath, abfsUriQueryBuilder.toString());
+
+ // Create a mock of the AbfsRestOperation to set the urlConnection in the corresponding httpOperation.
+ AbfsRestOperation op = Mockito.spy(new AbfsRestOperation(
+ AbfsRestOperationType.Append,
+ testClient,
+ HTTP_METHOD_PUT,
+ url,
+ requestHeaders, buffer,
+ appendRequestParameters.getoffset(),
+ appendRequestParameters.getLength(), null));
+
+ AbfsHttpOperation abfsHttpOperation = Mockito.spy(new AbfsHttpOperation(url,
+ HTTP_METHOD_PUT, requestHeaders));
+
+ // Sets the expect request property if expect header is enabled.
+ if (appendRequestParameters.isExpectHeaderEnabled()) {
+ Mockito.doReturn(HUNDRED_CONTINUE).when(abfsHttpOperation)
+ .getConnProperty(EXPECT);
+ }
+
+ HttpURLConnection urlConnection = mock(HttpURLConnection.class);
+ Mockito.doNothing().when(urlConnection).setRequestProperty(Mockito
+ .any(), Mockito.any());
+ Mockito.doReturn(HTTP_METHOD_PUT).when(urlConnection).getRequestMethod();
+ Mockito.doReturn(url).when(urlConnection).getURL();
+ Mockito.doReturn(urlConnection).when(abfsHttpOperation).getConnection();
+
+ Mockito.doNothing().when(abfsHttpOperation).setRequestProperty(Mockito
+ .any(), Mockito.any());
+ Mockito.doReturn(url).when(abfsHttpOperation).getConnUrl();
+
+ // Give user error code 404 when processResponse is called.
+ Mockito.doReturn(HTTP_METHOD_PUT).when(abfsHttpOperation).getConnRequestMethod();
+ Mockito.doReturn(HTTP_NOT_FOUND).when(abfsHttpOperation).getConnResponseCode();
+ Mockito.doReturn("Resource Not Found")
+ .when(abfsHttpOperation)
+ .getConnResponseMessage();
+
+ // Make the getOutputStream throw IOException to see it returns from the sendRequest correctly.
+ Mockito.doThrow(new ProtocolException("Server rejected Operation"))
+ .when(abfsHttpOperation)
+ .getConnOutputStream();
+
+ // Sets the httpOperation for the rest operation.
+ Mockito.doReturn(abfsHttpOperation)
+ .when(op)
+ .createHttpOperation();
+
+ // Mock the restOperation for the client.
+ Mockito.doReturn(op)
+ .when(testClient)
+ .getAbfsRestOperationForAppend(Mockito.any(),
+ Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(),
+ Mockito.nullable(int.class), Mockito.nullable(int.class),
+ Mockito.any());
+
+ TracingContext tracingContext = Mockito.spy(new TracingContext("abcd",
+ "abcde", FSOperationType.APPEND,
+ TracingHeaderFormat.ALL_ID_FORMAT, null));
+
+ // Check that expect header is enabled before the append call.
+ Assertions.assertThat(appendRequestParameters.isExpectHeaderEnabled())
+ .describedAs("The expect header is not true before the append call")
+ .isTrue();
+
+ intercept(AzureBlobFileSystemException.class,
+ () -> testClient.append(finalTestPath, buffer, appendRequestParameters, null, tracingContext));
+
+ // Verify that the request was not exponentially retried because of user error.
+ Assertions.assertThat(tracingContext.getRetryCount())
+ .describedAs("The retry count is incorrect")
+ .isEqualTo(0);
+
+ // Verify that the same request was retried with expect header disabled.
+ Assertions.assertThat(appendRequestParameters.isExpectHeaderEnabled())
+ .describedAs("The expect header is not false")
+ .isFalse();
+ }
}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsRestOperation.java
new file mode 100644
index 0000000000000..fe3c2a9892c4c
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsRestOperation.java
@@ -0,0 +1,358 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.ProtocolException;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.mockito.Mockito;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+import org.apache.hadoop.fs.azurebfs.TestAbfsConfigurationFieldsValidation;
+import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderFormat;
+
+import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
+import static java.net.HttpURLConnection.HTTP_OK;
+import static java.net.HttpURLConnection.HTTP_UNAVAILABLE;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPEND_ACTION;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE;
+import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.EXPECT;
+import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_HTTP_METHOD_OVERRIDE;
+import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_ACTION;
+import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_POSITION;
+import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_ABFS_ACCOUNT_NAME;
+import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.TEST_CONFIGURATION_FILE_NAME;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+
+@RunWith(Parameterized.class)
+public class ITestAbfsRestOperation extends AbstractAbfsIntegrationTest {
+
+ // Specifies whether getOutputStream() or write() throws IOException.
+ public enum ErrorType {OUTPUTSTREAM, WRITE};
+
+ private static final int HTTP_EXPECTATION_FAILED = 417;
+ private static final int HTTP_ERROR = 0;
+ private static final int ZERO = 0;
+ private static final int REDUCED_RETRY_COUNT = 2;
+ private static final int REDUCED_BACKOFF_INTERVAL = 100;
+ private static final int BUFFER_LENGTH = 5;
+ private static final int BUFFER_OFFSET = 0;
+ private static final String TEST_PATH = "/testfile";
+
+ // Specifies whether the expect header is enabled or not.
+ @Parameterized.Parameter
+ public boolean expectHeaderEnabled;
+
+ // Gives the http response code.
+ @Parameterized.Parameter(1)
+ public int responseCode;
+
+ // Gives the http response message.
+ @Parameterized.Parameter(2)
+ public String responseMessage;
+
+ // Gives the errorType based on the enum.
+ @Parameterized.Parameter(3)
+ public ErrorType errorType;
+
+ // The intercept.
+ private AbfsThrottlingIntercept intercept;
+
+ /*
+ HTTP_OK = 200,
+ HTTP_UNAVAILABLE = 503,
+ HTTP_NOT_FOUND = 404,
+ HTTP_EXPECTATION_FAILED = 417,
+ HTTP_ERROR = 0.
+ */
+ @Parameterized.Parameters(name = "expect={0}-code={1}-ErrorType={3}")
+ public static Iterable