Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
<suppressions>
<suppress checks="ParameterNumber|MagicNumber"
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]AzureBlobFileSystemStore.java"/>
<suppress checks="ParameterNumber"
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]services[\\/]AbfsClient.java"/>
<suppress checks="ParameterNumber|MagicNumber"
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]utils[\\/]Base64.java"/>
<suppress checks="ParameterNumber|VisibilityModifier"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.lang.reflect.Field;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType;
import org.apache.hadoop.fs.azurebfs.services.FixedSASTokenProvider;
import org.apache.hadoop.fs.azurebfs.constants.HttpOperationType;
import org.apache.hadoop.util.Preconditions;
Expand Down Expand Up @@ -73,6 +74,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.*;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.*;
Expand All @@ -86,13 +88,19 @@ public class AbfsConfiguration{

private final Configuration rawConfig;
private final String accountName;
// Service type identified from URL used to initialize FileSystem.
private final AbfsServiceType fsConfiguredServiceType;
private final boolean isSecure;
private static final Logger LOG = LoggerFactory.getLogger(AbfsConfiguration.class);

@StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ACCOUNT_IS_HNS_ENABLED,
DefaultValue = DEFAULT_FS_AZURE_ACCOUNT_IS_HNS_ENABLED)
private String isNamespaceEnabledAccount;

@BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ENABLE_DFSTOBLOB_FALLBACK,
DefaultValue = DEFAULT_FS_AZURE_ENABLE_DFSTOBLOB_FALLBACK)
private boolean isDfsToBlobFallbackEnabled;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_WRITE_MAX_CONCURRENT_REQUESTS,
DefaultValue = -1)
private int writeMaxConcurrentRequestCount;
Expand Down Expand Up @@ -387,11 +395,14 @@ public class AbfsConfiguration{
private String clientProvidedEncryptionKey;
private String clientProvidedEncryptionKeySHA;

public AbfsConfiguration(final Configuration rawConfig, String accountName)
throws IllegalAccessException, InvalidConfigurationValueException, IOException {
public AbfsConfiguration(final Configuration rawConfig,
String accountName,
AbfsServiceType fsConfiguredServiceType)
throws IllegalAccessException, IOException {
this.rawConfig = ProviderUtils.excludeIncompatibleCredentialProviders(
rawConfig, AzureBlobFileSystem.class);
this.accountName = accountName;
this.fsConfiguredServiceType = fsConfiguredServiceType;
this.isSecure = getBoolean(FS_AZURE_SECURE_MODE, false);

Field[] fields = this.getClass().getDeclaredFields();
Expand All @@ -413,11 +424,76 @@ public AbfsConfiguration(final Configuration rawConfig, String accountName)
}
}

public AbfsConfiguration(final Configuration rawConfig, String accountName)
throws IllegalAccessException, IOException {
this(rawConfig, accountName, AbfsServiceType.DFS);
}

public Trilean getIsNamespaceEnabledAccount() {
return Trilean.getTrilean(
getString(FS_AZURE_ACCOUNT_IS_HNS_ENABLED, isNamespaceEnabledAccount));
}

/**
* Returns the service type to be used based on the filesystem configuration.
* Precedence is given to service type configured for FNS Accounts using
* "fs.azure.fns.account.service.type". If not configured, then the service
* type identified from url used to initialize filesystem will be used.
* @return the service type.
*/
public AbfsServiceType getFsConfiguredServiceType() {
return getEnum(FS_AZURE_FNS_ACCOUNT_SERVICE_TYPE, fsConfiguredServiceType);
}

/**
* Returns the service type configured for FNS Accounts to override the
* service type identified by URL used to initialize the filesystem.
* @return the service type.
*/
public AbfsServiceType getConfiguredServiceTypeForFNSAccounts() {
return getEnum(FS_AZURE_FNS_ACCOUNT_SERVICE_TYPE, null);
}

/**
* Returns the service type to be used for Ingress Operations irrespective of account type.
* Default value is the same as the service type configured for the file system.
* @return the service type.
*/
public AbfsServiceType getIngressServiceType() {
return getEnum(FS_AZURE_INGRESS_SERVICE_TYPE, getFsConfiguredServiceType());
}

/**
* Returns whether there is a need to move traffic from DFS to Blob.
* Needed when the service type is DFS and operations are experiencing compatibility issues.
* @return true if fallback enabled.
*/
public boolean isDfsToBlobFallbackEnabled() {
return isDfsToBlobFallbackEnabled;
}

/**
* Checks if the service type configured is valid for account type used.
* HNS Enabled accounts cannot have service type as BLOB.
* @param isHNSEnabled Flag to indicate if HNS is enabled for the account.
* @throws InvalidConfigurationValueException if the service type is invalid.
*/
public void validateConfiguredServiceType(boolean isHNSEnabled)
throws InvalidConfigurationValueException {
// Todo: [FnsOverBlob] - Remove this check, Failing FS Init with Blob Endpoint Until FNS over Blob is ready.
if (getFsConfiguredServiceType() == AbfsServiceType.BLOB) {
throw new InvalidConfigurationValueException(FS_DEFAULT_NAME_KEY,
"Blob Endpoint Support not yet available");
}
if (isHNSEnabled && getConfiguredServiceTypeForFNSAccounts() == AbfsServiceType.BLOB) {
throw new InvalidConfigurationValueException(
FS_AZURE_FNS_ACCOUNT_SERVICE_TYPE, "Cannot be BLOB for HNS Account");
} else if (isHNSEnabled && fsConfiguredServiceType == AbfsServiceType.BLOB) {
throw new InvalidConfigurationValueException(FS_DEFAULT_NAME_KEY,
"Blob Endpoint Url Cannot be used to initialize filesystem for HNS Account");
}
}

/**
* Gets the Azure Storage account name corresponding to this instance of configuration.
* @return the Azure Storage account name
Expand Down Expand Up @@ -458,6 +534,7 @@ public String get(String key) {
* Returns the account-specific value if it exists, then looks for an
* account-agnostic value.
* @param key Account-agnostic configuration key
* @param defaultValue Value returned if none is configured
* @return value if one exists, else the default value
*/
public String getString(String key, String defaultValue) {
Expand Down Expand Up @@ -502,7 +579,7 @@ public int getInt(String key, int defaultValue) {
* looks for an account-agnostic value.
* @param key Account-agnostic configuration key
* @return value in String form if one exists, else null
* @throws IOException
* @throws IOException if parsing fails.
*/
public String getPasswordString(String key) throws IOException {
char[] passchars = rawConfig.getPassword(accountConf(key));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException;
import org.apache.hadoop.fs.impl.BackReference;
import org.apache.hadoop.security.ProviderUtils;
import org.apache.hadoop.util.Preconditions;
Expand Down Expand Up @@ -111,13 +112,16 @@
import org.apache.hadoop.util.LambdaUtils;
import org.apache.hadoop.util.Progressable;

import static java.net.HttpURLConnection.HTTP_BAD_REQUEST;
import static java.net.HttpURLConnection.HTTP_CONFLICT;
import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL_DEFAULT;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.*;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CPK_IN_NON_HNS_ACCOUNT_ERROR_MESSAGE;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.DATA_BLOCKS_BUFFER;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_IS_HNS_ENABLED;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOCK_UPLOAD_ACTIVE_BLOCKS;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOCK_UPLOAD_BUFFER_DIR;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.BLOCK_UPLOAD_ACTIVE_BLOCKS_DEFAULT;
Expand Down Expand Up @@ -215,6 +219,23 @@ public void initialize(URI uri, Configuration configuration)

TracingContext tracingContext = new TracingContext(clientCorrelationId,
fileSystemId, FSOperationType.CREATE_FILESYSTEM, tracingHeaderFormat, listener);

/*
* Validate the service type configured in the URI is valid for account type used.
* HNS Account Cannot have Blob Endpoint URI.
*/
try {
abfsConfiguration.validateConfiguredServiceType(
tryGetIsNamespaceEnabled(new TracingContext(tracingContext)));
} catch (InvalidConfigurationValueException ex) {
LOG.debug("File system configured with Invalid Service Type", ex);
throw ex;
} catch (AzureBlobFileSystemException ex) {
LOG.debug("Failed to determine account type for service type validation", ex);
throw new InvalidConfigurationValueException(FS_AZURE_ACCOUNT_IS_HNS_ENABLED, ex);
}

// Create the file system if it does not exist.
if (abfsConfiguration.getCreateRemoteFileSystemDuringInitialization()) {
if (this.tryGetFileStatus(new Path(AbfsHttpConstants.ROOT_PATH), tracingContext) == null) {
try {
Expand All @@ -232,10 +253,7 @@ public void initialize(URI uri, Configuration configuration)
*/
if ((isEncryptionContextCPK(abfsConfiguration) || isGlobalKeyCPK(
abfsConfiguration))
&& !getIsNamespaceEnabled(
new TracingContext(clientCorrelationId, fileSystemId,
FSOperationType.CREATE_FILESYSTEM, tracingHeaderFormat,
listener))) {
&& !getIsNamespaceEnabled(new TracingContext(tracingContext))) {
/*
* Close the filesystem gracefully before throwing exception. Graceful close
* will ensure that all resources are released properly.
Expand Down Expand Up @@ -1390,6 +1408,34 @@ private FileStatus tryGetFileStatus(final Path f, TracingContext tracingContext)
}
}

/**
* Utility function to check if the namespace is enabled on the storage account.
* If request fails with 4xx other than 400, it will be inferred as HNS.
* @param tracingContext tracing context
* @return true if namespace is enabled, false otherwise.
* @throws AzureBlobFileSystemException if any other error occurs.
*/
private boolean tryGetIsNamespaceEnabled(TracingContext tracingContext)
throws AzureBlobFileSystemException{
try {
return getIsNamespaceEnabled(tracingContext);
} catch (AbfsRestOperationException ex) {
/*
* Exception will be thrown for any non 400 error code.
* If status code is in 4xx range, it means it's an HNS account.
* If status code is in 5xx range, it means nothing can be inferred.
* In case of network errors status code will be -1.
*/
int statusCode = ex.getStatusCode();
if (statusCode > HTTP_BAD_REQUEST && statusCode < HTTP_INTERNAL_ERROR) {
LOG.debug("getNamespace failed with non 400 user error", ex);
statIncrement(ERROR_IGNORED);
return true;
}
throw ex;
}
}

private boolean fileSystemExists() throws IOException {
LOG.debug(
"AzureBlobFileSystem.fileSystemExists uri: {}", uri);
Expand Down Expand Up @@ -1650,7 +1696,7 @@ AbfsDelegationTokenManager getDelegationTokenManager() {
@VisibleForTesting
boolean getIsNamespaceEnabled(TracingContext tracingContext)
throws AzureBlobFileSystemException {
return abfsStore.getIsNamespaceEnabled(tracingContext);
return getAbfsStore().getIsNamespaceEnabled(tracingContext);
}

/**
Expand Down
Loading