Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
0de876e
Enable throttling at account level
anmolasrani123 Sep 19, 2022
7faec72
Resolving PR comments
anmolasrani123 Sep 20, 2022
09602b0
Unused import remove
anmolasrani123 Sep 20, 2022
4412a54
Merge branch 'apache:trunk' into HADOOP-18457
anmolanmol1234 Sep 20, 2022
934f00f
Fix for null pointer exception
anmolasrani123 Sep 20, 2022
5cd2a2b
Fixed naming for intercept
anmolasrani123 Sep 21, 2022
487f65b
Addressing PR
anmolasrani123 Sep 21, 2022
35a82da
Merge branch 'apache:trunk' into HADOOP-18457
anmolanmol1234 Sep 21, 2022
e3c5de4
Changes
anmolasrani123 Oct 11, 2022
008e447
Changes for timer
anmolasrani123 Oct 11, 2022
31368bf
Changes for account idle
anmolasrani123 Oct 11, 2022
ae244eb
Enable account level throttling
anmolasrani123 Oct 12, 2022
dde4604
Adding tests
anmolasrani123 Oct 13, 2022
2445dc9
adding tests
anmolasrani123 Oct 14, 2022
09019f2
Changes for throttling with idle time as config
anmolasrani123 Oct 14, 2022
939a6b9
Fixed imports
anmolasrani123 Oct 14, 2022
a124975
Merge branch 'apache:trunk' into HADOOP-18457_temp
anmolanmol1234 Oct 14, 2022
62f0427
Adding back files
anmolasrani123 Oct 14, 2022
cbd96e3
Adding back files
anmolasrani123 Oct 14, 2022
a9b6d6d
Adding back files
anmolasrani123 Oct 14, 2022
47d306d
Individual operation timeout
anmolasrani123 Oct 17, 2022
bdf2686
Changes for operation level idle timeout
anmolasrani123 Oct 18, 2022
996a212
Checkstyle fixes
anmolasrani123 Oct 19, 2022
6121a6d
Interface for throttling
anmolasrani123 Oct 28, 2022
6a6e288
Test added for interface for intercept
anmolasrani123 Oct 31, 2022
7db57ac
Double checking fix
anmolasrani123 Nov 2, 2022
448b236
addressed PR comments
anmolasrani123 Nov 2, 2022
6a39779
Fix try finally
anmolasrani123 Nov 2, 2022
bfbc3c0
Fix for synchronization
anmolasrani123 Nov 3, 2022
98d3110
Added enum for timer functionality
anmolasrani123 Nov 3, 2022
499db63
Merge branch 'apache:trunk' into HADOOP-18457_temp
anmolanmol1234 Nov 3, 2022
29fe5b1
Merge branch 'HADOOP-18457_temp' of https://github.com/anmolanmol1234…
anmolasrani123 Nov 3, 2022
c552e0b
Fix for lost reference
anmolasrani123 Nov 3, 2022
a4764d6
Fix for mock client
anmolasrani123 Nov 3, 2022
ece0552
Addressing PR comments
anmolasrani123 Nov 28, 2022
b3546d3
Made the class final
anmolasrani123 Nov 29, 2022
d4a5a93
Merge branch 'apache:trunk' into HADOOP-18457_temp
anmolanmol1234 Nov 29, 2022
f819e15
Merge branch 'apache:trunk' into HADOOP-18457_temp
anmolanmol1234 Nov 30, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_OPTIMIZE_FOOTER_READ)
private boolean optimizeFooterRead;

@BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED,
DefaultValue = DEFAULT_FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED)
private boolean accountThrottlingEnabled;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_READ_BUFFER_SIZE,
MinValue = MIN_BUFFER_SIZE,
MaxValue = MAX_BUFFER_SIZE,
Expand Down Expand Up @@ -260,6 +264,14 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_ENABLE_AUTOTHROTTLING)
private boolean enableAutoThrottling;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ACCOUNT_OPERATION_IDLE_TIMEOUT,
DefaultValue = DEFAULT_ACCOUNT_OPERATION_IDLE_TIMEOUT_MS)
private int accountOperationIdleTimeout;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ANALYSIS_PERIOD,
DefaultValue = DEFAULT_ANALYSIS_PERIOD_MS)
private int analysisPeriod;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ABFS_IO_RATE_LIMIT,
MinValue = 0,
DefaultValue = RATE_LIMIT_DEFAULT)
Expand Down Expand Up @@ -694,6 +706,10 @@ public String getAppendBlobDirs() {
return this.azureAppendBlobDirs;
}

public boolean accountThrottlingEnabled() {
return accountThrottlingEnabled;
}

public String getAzureInfiniteLeaseDirs() {
return this.azureInfiniteLeaseDirs;
}
Expand Down Expand Up @@ -736,6 +752,14 @@ public boolean isAutoThrottlingEnabled() {
return this.enableAutoThrottling;
}

public int getAccountOperationIdleTimeout() {
return accountOperationIdleTimeout;
}

public int getAnalysisPeriod() {
return analysisPeriod;
}

public int getRateLimit() {
return rateLimit;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.azurebfs.commit.ResilientCommitByRename;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsClientThrottlingIntercept;
import org.apache.hadoop.fs.azurebfs.services.AbfsListStatusRemoteIterator;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.classification.InterfaceStability;
Expand Down Expand Up @@ -225,7 +224,6 @@ public void initialize(URI uri, Configuration configuration)
}
}

AbfsClientThrottlingIntercept.initializeSingleton(abfsConfiguration.isAutoThrottlingEnabled());
rateLimiting = RateLimitingFactory.create(abfsConfiguration.getRateLimit());
LOG.debug("Initializing AzureBlobFileSystem for {} complete", uri);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public final class ConfigurationKeys {
public static final String FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME = "fs.azure.account.key";
public static final String FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME_REGX = "fs\\.azure\\.account\\.key\\.(.*)";
public static final String FS_AZURE_SECURE_MODE = "fs.azure.secure.mode";
public static final String FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED = "fs.azure.account.throttling.enabled";

// Retry strategy defined by the user
public static final String AZURE_MIN_BACKOFF_INTERVAL = "fs.azure.io.retry.min.backoff.interval";
Expand Down Expand Up @@ -116,6 +117,8 @@ public final class ConfigurationKeys {
public static final String AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION = "fs.azure.createRemoteFileSystemDuringInitialization";
public static final String AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION = "fs.azure.skipUserGroupMetadataDuringInitialization";
public static final String FS_AZURE_ENABLE_AUTOTHROTTLING = "fs.azure.enable.autothrottling";
public static final String FS_AZURE_ACCOUNT_OPERATION_IDLE_TIMEOUT = "fs.azure.account.operation.idle.timeout";
public static final String FS_AZURE_ANALYSIS_PERIOD = "fs.azure.analysis.period";
public static final String FS_AZURE_ALWAYS_USE_HTTPS = "fs.azure.always.use.https";
public static final String FS_AZURE_ATOMIC_RENAME_KEY = "fs.azure.atomic.rename.key";
/** This config ensures that during create overwrite an existing file will be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ public final class FileSystemConfigurations {
public static final boolean DEFAULT_ENABLE_FLUSH = true;
public static final boolean DEFAULT_DISABLE_OUTPUTSTREAM_FLUSH = true;
public static final boolean DEFAULT_ENABLE_AUTOTHROTTLING = true;
public static final boolean DEFAULT_FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED = true;
public static final int DEFAULT_ACCOUNT_OPERATION_IDLE_TIMEOUT_MS = 60_000;
public static final int DEFAULT_ANALYSIS_PERIOD_MS = 10_000;

public static final DelegatingSSLSocketFactory.SSLChannelMode DEFAULT_FS_AZURE_SSL_CHANNEL_MODE
= DelegatingSSLSocketFactory.SSLChannelMode.Default;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ public class AbfsClient implements Closeable {
private AccessTokenProvider tokenProvider;
private SASTokenProvider sasTokenProvider;
private final AbfsCounters abfsCounters;
private final AbfsThrottlingIntercept intercept;

private final ListeningScheduledExecutorService executorService;

Expand All @@ -120,6 +121,7 @@ private AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCreden
this.retryPolicy = abfsClientContext.getExponentialRetryPolicy();
this.accountName = abfsConfiguration.getAccountName().substring(0, abfsConfiguration.getAccountName().indexOf(AbfsHttpConstants.DOT));
this.authType = abfsConfiguration.getAuthType(accountName);
this.intercept = AbfsThrottlingInterceptFactory.getInstance(accountName, abfsConfiguration);

String encryptionKey = this.abfsConfiguration
.getClientProvidedEncryptionKey();
Expand Down Expand Up @@ -216,6 +218,10 @@ SharedKeyCredentials getSharedKeyCredentials() {
return sharedKeyCredentials;
}

AbfsThrottlingIntercept getIntercept() {
return intercept;
}

List<AbfsHttpHeader> createDefaultHeaders() {
final List<AbfsHttpHeader> requestHeaders = new ArrayList<AbfsHttpHeader>();
requestHeaders.add(new AbfsHttpHeader(X_MS_VERSION, xMsVersion));
Expand Down Expand Up @@ -1277,6 +1283,14 @@ protected AbfsCounters getAbfsCounters() {
return abfsCounters;
}

/**
* Getter for abfsConfiguration from AbfsClient.
* @return AbfsConfiguration instance
*/
protected AbfsConfiguration getAbfsConfiguration() {
return abfsConfiguration;
}

public int getNumLeaseThreads() {
return abfsConfiguration.getNumLeaseThreads();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,23 @@

import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.util.Preconditions;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.util.Time.now;

class AbfsClientThrottlingAnalyzer {
private static final Logger LOG = LoggerFactory.getLogger(
AbfsClientThrottlingAnalyzer.class);
private static final int DEFAULT_ANALYSIS_PERIOD_MS = 10 * 1000;
private static final int MIN_ANALYSIS_PERIOD_MS = 1000;
private static final int MAX_ANALYSIS_PERIOD_MS = 30000;
private static final double MIN_ACCEPTABLE_ERROR_PERCENTAGE = .1;
Expand All @@ -50,42 +53,38 @@ class AbfsClientThrottlingAnalyzer {
private String name = null;
private Timer timer = null;
private AtomicReference<AbfsOperationMetrics> blobMetrics = null;
private AtomicLong lastExecutionTime = null;
private final AtomicBoolean isOperationOnAccountIdle = new AtomicBoolean(false);
private AbfsConfiguration abfsConfiguration = null;
private boolean accountLevelThrottlingEnabled = true;

private AbfsClientThrottlingAnalyzer() {
// hide default constructor
}

/**
* Creates an instance of the <code>AbfsClientThrottlingAnalyzer</code> class with
* the specified name.
*
* @param name a name used to identify this instance.
* @throws IllegalArgumentException if name is null or empty.
*/
AbfsClientThrottlingAnalyzer(String name) throws IllegalArgumentException {
this(name, DEFAULT_ANALYSIS_PERIOD_MS);
}

/**
* Creates an instance of the <code>AbfsClientThrottlingAnalyzer</code> class with
* the specified name and period.
*
* @param name A name used to identify this instance.
* @param period The frequency, in milliseconds, at which metrics are
* analyzed.
* @param abfsConfiguration The configuration set.
* @throws IllegalArgumentException If name is null or empty.
* If period is less than 1000 or greater than 30000 milliseconds.
*/
AbfsClientThrottlingAnalyzer(String name, int period)
AbfsClientThrottlingAnalyzer(String name, AbfsConfiguration abfsConfiguration)
throws IllegalArgumentException {
Preconditions.checkArgument(
StringUtils.isNotEmpty(name),
"The argument 'name' cannot be null or empty.");
int period = abfsConfiguration.getAnalysisPeriod();
Preconditions.checkArgument(
period >= MIN_ANALYSIS_PERIOD_MS && period <= MAX_ANALYSIS_PERIOD_MS,
"The argument 'period' must be between 1000 and 30000.");
this.name = name;
this.analysisPeriodMs = period;
this.abfsConfiguration = abfsConfiguration;
this.accountLevelThrottlingEnabled = abfsConfiguration.accountThrottlingEnabled();
this.analysisPeriodMs = abfsConfiguration.getAnalysisPeriod();
this.lastExecutionTime = new AtomicLong(now());
this.blobMetrics = new AtomicReference<AbfsOperationMetrics>(
new AbfsOperationMetrics(System.currentTimeMillis()));
this.timer = new Timer(
Expand All @@ -95,6 +94,47 @@ private AbfsClientThrottlingAnalyzer() {
analysisPeriodMs);
}

/**
* Resumes the timer if it was stopped.
*/
private void resumeTimer() {
blobMetrics = new AtomicReference<AbfsOperationMetrics>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use <> in the constructor

new AbfsOperationMetrics(System.currentTimeMillis()));
timer.schedule(new TimerTaskImpl(),
analysisPeriodMs,
analysisPeriodMs);
isOperationOnAccountIdle.set(false);
}

/**
* Synchronized method to suspend or resume timer.
* @param timerFunctionality resume or suspend.
* @param timerTask The timertask object.
* @return true or false.
*/
private synchronized boolean timerOrchestrator(TimerFunctionality timerFunctionality,
TimerTask timerTask) {
switch (timerFunctionality) {
case RESUME:
if (isOperationOnAccountIdle.get()) {
resumeTimer();
}
break;
case SUSPEND:
if (accountLevelThrottlingEnabled && (System.currentTimeMillis()
- lastExecutionTime.get() >= getOperationIdleTimeout())) {
isOperationOnAccountIdle.set(true);
timerTask.cancel();
timer.purge();
return true;
}
break;
default:
break;
}
return false;
}

/**
* Updates metrics with results from the current storage operation.
*
Expand All @@ -104,19 +144,22 @@ private AbfsClientThrottlingAnalyzer() {
public void addBytesTransferred(long count, boolean isFailedOperation) {
AbfsOperationMetrics metrics = blobMetrics.get();
if (isFailedOperation) {
metrics.bytesFailed.addAndGet(count);
metrics.operationsFailed.incrementAndGet();
metrics.addBytesFailed(count);
metrics.incrementOperationsFailed();
} else {
metrics.bytesSuccessful.addAndGet(count);
metrics.operationsSuccessful.incrementAndGet();
metrics.addBytesSuccessful(count);
metrics.incrementOperationsSuccessful();
}
blobMetrics.set(metrics);
}

/**
* Suspends the current storage operation, as necessary, to reduce throughput.
* @return true if Thread sleeps(Throttling occurs) else false.
*/
public boolean suspendIfNecessary() {
lastExecutionTime.set(now());
timerOrchestrator(TimerFunctionality.RESUME, null);
int duration = sleepDuration;
if (duration > 0) {
try {
Expand All @@ -134,19 +177,27 @@ int getSleepDuration() {
return sleepDuration;
}

int getOperationIdleTimeout() {
return abfsConfiguration.getAccountOperationIdleTimeout();
}

AtomicBoolean getIsOperationOnAccountIdle() {
return isOperationOnAccountIdle;
}

private int analyzeMetricsAndUpdateSleepDuration(AbfsOperationMetrics metrics,
int sleepDuration) {
final double percentageConversionFactor = 100;
double bytesFailed = metrics.bytesFailed.get();
double bytesSuccessful = metrics.bytesSuccessful.get();
double operationsFailed = metrics.operationsFailed.get();
double operationsSuccessful = metrics.operationsSuccessful.get();
double bytesFailed = metrics.getBytesFailed().get();
double bytesSuccessful = metrics.getBytesSuccessful().get();
double operationsFailed = metrics.getOperationsFailed().get();
double operationsSuccessful = metrics.getOperationsSuccessful().get();
double errorPercentage = (bytesFailed <= 0)
? 0
: (percentageConversionFactor
* bytesFailed
/ (bytesFailed + bytesSuccessful));
long periodMs = metrics.endTime - metrics.startTime;
long periodMs = metrics.getEndTime() - metrics.getStartTime();

double newSleepDuration;

Expand Down Expand Up @@ -238,10 +289,13 @@ public void run() {
}

long now = System.currentTimeMillis();
if (now - blobMetrics.get().startTime >= analysisPeriodMs) {
if (timerOrchestrator(TimerFunctionality.SUSPEND, this)) {
return;
}
if (now - blobMetrics.get().getStartTime() >= analysisPeriodMs) {
AbfsOperationMetrics oldMetrics = blobMetrics.getAndSet(
new AbfsOperationMetrics(now));
oldMetrics.endTime = now;
oldMetrics.setEndTime(now);
sleepDuration = analyzeMetricsAndUpdateSleepDuration(oldMetrics,
sleepDuration);
}
Expand All @@ -252,24 +306,4 @@ public void run() {
}
}
}

/**
* Stores Abfs operation metrics during each analysis period.
*/
static class AbfsOperationMetrics {
private AtomicLong bytesFailed;
private AtomicLong bytesSuccessful;
private AtomicLong operationsFailed;
private AtomicLong operationsSuccessful;
private long endTime;
private long startTime;

AbfsOperationMetrics(long startTime) {
this.startTime = startTime;
this.bytesFailed = new AtomicLong();
this.bytesSuccessful = new AtomicLong();
this.operationsFailed = new AtomicLong();
this.operationsSuccessful = new AtomicLong();
}
}
}
Loading