Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@
<suppressions>
<suppress checks="ParameterNumber|MagicNumber"
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]AzureBlobFileSystemStore.java"/>
<suppress checks="ParameterNumber"
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]services[\\/]AbfsBlobClient.java"/>
<suppress checks="ParameterNumber"
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]services[\\/]AbfsClient.java"/>
<suppress checks="ParameterNumber"
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]contracts[\\/]services[\\/]AppendRequestParameters.java"/>
<suppress checks="ParameterNumber|MagicNumber"
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]utils[\\/]Base64.java"/>
<suppress checks="ParameterNumber|VisibilityModifier"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1870,7 +1870,7 @@ private FSDataOutputStream create(Path f, FsPermission permission,
* @return the output stream used to write data into the newly created file .
* @throws IOException if an IO error occurs while attempting to delete the
* path.
*
* @throws FileAlreadyExistsException if file already exists at path.
*/
protected FSDataOutputStream createInternal(Path f, FsPermission permission,
boolean overwrite,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,21 +86,14 @@ public class AbfsConfiguration{

private final Configuration rawConfig;
private final String accountName;
private final AbfsServiceType fsConfiguredServiceType;
private final boolean isSecure;
private static final Logger LOG = LoggerFactory.getLogger(AbfsConfiguration.class);

@StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ACCOUNT_IS_HNS_ENABLED,
DefaultValue = DEFAULT_FS_AZURE_ACCOUNT_IS_HNS_ENABLED)
private String isNamespaceEnabledAccount;

@StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_FNS_ACCOUNT_SERVICE_TYPE,
DefaultValue = DEFAULT_FS_AZURE_FNS_ACCOUNT_SERVICE_TYPE)
private String fnsAccountServiceType;

@StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_INGRESS_SERVICE_TYPE,
DefaultValue = DEFAULT_FS_AZURE_INGRESS_SERVICE_TYPE)
private String ingressServiceType;

@BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ENABLE_DFSTOBLOB_FALLBACK,
DefaultValue = DEFAULT_FS_AZURE_ENABLE_DFSTOBLOB_FALLBACK)
private boolean isDfsToBlobFallbackEnabled;
Expand Down Expand Up @@ -405,11 +398,14 @@ public class AbfsConfiguration{
private String clientProvidedEncryptionKey;
private String clientProvidedEncryptionKeySHA;

public AbfsConfiguration(final Configuration rawConfig, String accountName)
throws IllegalAccessException, InvalidConfigurationValueException, IOException {
public AbfsConfiguration(final Configuration rawConfig,
String accountName,
AbfsServiceType fsConfiguredServiceType)
throws IllegalAccessException, IOException {
this.rawConfig = ProviderUtils.excludeIncompatibleCredentialProviders(
rawConfig, AzureBlobFileSystem.class);
this.accountName = accountName;
this.fsConfiguredServiceType = fsConfiguredServiceType;
this.isSecure = getBoolean(FS_AZURE_SECURE_MODE, false);

Field[] fields = this.getClass().getDeclaredFields();
Expand All @@ -431,22 +427,21 @@ public AbfsConfiguration(final Configuration rawConfig, String accountName)
}
}

public AbfsConfiguration(final Configuration rawConfig, String accountName)
throws IllegalAccessException, IOException {
this(rawConfig, accountName, AbfsServiceType.DFS);
}

public Trilean getIsNamespaceEnabledAccount() {
return Trilean.getTrilean(isNamespaceEnabledAccount);
}

public AbfsServiceType getFnsAccountServiceType() {
if (fnsAccountServiceType.compareToIgnoreCase(AbfsServiceType.DFS.name()) == 0) {
return AbfsServiceType.DFS;
}
return AbfsServiceType.BLOB;
public AbfsServiceType getFsConfiguredServiceType() {
return getEnum(FS_AZURE_FNS_ACCOUNT_SERVICE_TYPE, fsConfiguredServiceType);
}

public AbfsServiceType getIngressServiceType() {
if (ingressServiceType.compareToIgnoreCase(AbfsServiceType.DFS.name()) == 0) {
return AbfsServiceType.DFS;
}
return AbfsServiceType.BLOB;
return getEnum(FS_AZURE_INGRESS_SERVICE_TYPE, getFsConfiguredServiceType());
}

public boolean isDfsToBlobFallbackEnabled() {
Expand All @@ -458,9 +453,9 @@ public boolean isDfsToBlobFallbackEnabled() {
* @return true if blob client initialization is required, false otherwise
*/
public boolean isBlobClientInitRequired() {
return getFnsAccountServiceType() == AbfsServiceType.BLOB
return getFsConfiguredServiceType() == AbfsServiceType.BLOB
|| getIngressServiceType() == AbfsServiceType.BLOB
|| isDfsToBlobFallbackEnabled;
|| isDfsToBlobFallbackEnabled();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,8 @@ public void initialize(URI uri, Configuration configuration)
tracingHeaderFormat = abfsConfiguration.getTracingHeaderFormat();
this.setWorkingDirectory(this.getHomeDirectory());

abfsStore.validateConfiguredServiceType(getInitTracingContext());

TracingContext tracingContext = new TracingContext(clientCorrelationId,
fileSystemId, FSOperationType.CREATE_FILESYSTEM, tracingHeaderFormat, listener);
if (abfsConfiguration.getCreateRemoteFileSystemDuringInitialization()) {
Expand Down Expand Up @@ -1304,7 +1306,7 @@ public void access(final Path path, final FsAction mode) throws IOException {
* Incrementing exists() calls from superclass for statistic collection.
* @param f source path.
* @return true if the path exists.
* @throws IOException if operation fails
* @throws IOException if some issue in checking path.
*/
@Override
public boolean exists(Path f) throws IOException {
Expand Down Expand Up @@ -1442,6 +1444,11 @@ private boolean isAbfsScheme(final String scheme) {
return false;
}

private TracingContext getInitTracingContext() {
return new TracingContext(clientCorrelationId, fileSystemId,
FSOperationType.INIT, tracingHeaderFormat, listener);
}

@VisibleForTesting
<T> FileSystemOperation<T> execute(
final String scopeDescription,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.UnsupportedAbfsOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException;
import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider;
import org.apache.hadoop.fs.azurebfs.security.ContextProviderEncryptionAdapter;
import org.apache.hadoop.fs.azurebfs.security.ContextEncryptionAdapter;
Expand Down Expand Up @@ -164,6 +164,7 @@
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_ABFS_ENDPOINT;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_FOOTER_READ_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BUFFERED_PREAD_DISABLE;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_FNS_ACCOUNT_SERVICE_TYPE;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_IDENTITY_TRANSFORM_CLASS;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_ENCRYPTION_CONTEXT;

Expand All @@ -177,7 +178,6 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {

private AbfsClient client;
private AbfsClientHandler clientHandler;
private AbfsServiceType defaultServiceType;
private URI uri;
private String userName;
private String primaryUserGroup;
Expand Down Expand Up @@ -222,7 +222,6 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
public AzureBlobFileSystemStore(
AzureBlobFileSystemStoreBuilder abfsStoreBuilder) throws IOException {
this.uri = abfsStoreBuilder.uri;
this.defaultServiceType = getDefaultServiceType(abfsStoreBuilder.configuration);
String[] authorityParts = authorityParts(uri);
final String fileSystemName = authorityParts[0];
final String accountName = authorityParts[1];
Expand All @@ -231,7 +230,8 @@ public AzureBlobFileSystemStore(
leaseRefs = Collections.synchronizedMap(new WeakHashMap<>());

try {
this.abfsConfiguration = new AbfsConfiguration(abfsStoreBuilder.configuration, accountName);
this.abfsConfiguration = new AbfsConfiguration(
abfsStoreBuilder.configuration, accountName, identifyAbfsServiceType());
} catch (IllegalAccessException exception) {
throw new FileSystemOperationUnhandledException(exception);
}
Expand Down Expand Up @@ -293,6 +293,21 @@ public AzureBlobFileSystemStore(
"abfs-bounded");
}

public void validateConfiguredServiceType(TracingContext tracingContext)
throws AzureBlobFileSystemException {
// Todo: [FnsOverBlob] - Fail FS Init with Blob Endpoint Until FNS over Blob is ready.
if (getConfiguredServiceType() == AbfsServiceType.BLOB) {
throw new InvalidConfigurationValueException(FS_DEFAULT_NAME_KEY);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should also inform what is wrong with the config.

}
if (getIsNamespaceEnabled(tracingContext) && getConfiguredServiceType() == AbfsServiceType.BLOB) {
// This could be because of either wrongly configured url or wrongly configured fns service type.
if(identifyAbfsServiceType() == AbfsServiceType.BLOB) {
throw new InvalidConfigurationValueException(FS_DEFAULT_NAME_KEY);
}
throw new InvalidConfigurationValueException(FS_AZURE_FNS_ACCOUNT_SERVICE_TYPE);
}
}

/**
* Checks if the given key in Azure Storage should be stored as a page
* blob instead of block blob.
Expand Down Expand Up @@ -1776,38 +1791,40 @@ private void initializeClient(URI uri, String fileSystemName,
dfsClient = new AbfsDfsClient(baseUrl, creds, abfsConfiguration,
tokenProvider, encryptionContextProvider,
populateAbfsClientContext());
blobClient = defaultServiceType == AbfsServiceType.BLOB
|| abfsConfiguration.isBlobClientInitRequired()
blobClient = abfsConfiguration.isBlobClientInitRequired()
? new AbfsBlobClient(baseUrl, creds, abfsConfiguration, tokenProvider,
encryptionContextProvider, populateAbfsClientContext())
: null;
} else {
dfsClient = new AbfsDfsClient(baseUrl, creds, abfsConfiguration,
sasTokenProvider, encryptionContextProvider,
populateAbfsClientContext());
blobClient = defaultServiceType == AbfsServiceType.BLOB
|| abfsConfiguration.isBlobClientInitRequired()
blobClient = abfsConfiguration.isBlobClientInitRequired()
? new AbfsBlobClient(baseUrl, creds, abfsConfiguration, sasTokenProvider,
encryptionContextProvider, populateAbfsClientContext())
: null;
}

this.clientHandler = new AbfsClientHandler(defaultServiceType, dfsClient, blobClient);
this.clientHandler = new AbfsClientHandler(getConfiguredServiceType(),
dfsClient, blobClient);
this.client = clientHandler.getClient();

LOG.trace("AbfsClient init complete");
}

private AbfsServiceType getDefaultServiceType(Configuration conf)
throws UnsupportedAbfsOperationException{
if (conf.get(FS_DEFAULT_NAME_KEY).contains(AbfsServiceType.BLOB.toString().toLowerCase())) {
// Todo: [FnsOverBlob] return "AbfsServiceType.BLOB" once the code is ready for Blob Endpoint Support.
throw new UnsupportedAbfsOperationException(
"Blob Endpoint Support is not yet implemented. Please use DFS Endpoint.");
private AbfsServiceType identifyAbfsServiceType() {
if (uri.toString().contains(FileSystemUriSchemes.ABFS_BLOB_DOMAIN_NAME)) {
return AbfsServiceType.BLOB;
}
// Incase of DFS Domain name or any other custom endpoint, the service
// type is to be identified as default DFS.
return AbfsServiceType.DFS;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can have get(uri) in AbfsServiceType, which returns back the relevant enum. Also, we can remove FileSystemUriSchemes#ABFS_BLOB_DOMAIN_NAME, and ABFS_DFS_DOMAIN_NAME

}

private AbfsServiceType getConfiguredServiceType() {
return abfsConfiguration.getFsConfiguredServiceType();
}

/**
* Populate a new AbfsClientContext instance with the desired properties.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,17 @@
* </ol>
*/
public enum AbfsServiceType {
DFS,
BLOB
DFS(".dfs.core.windows.net"),
BLOB(".blob.core.windows.net");

private final String endpointDnsSuffix;

AbfsServiceType(String endpointDnsSuffix) {
this.endpointDnsSuffix = endpointDnsSuffix;
}

@Override
public String toString() {
return endpointDnsSuffix;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ public enum FSOperationType {
SET_OWNER("SO"),
SET_ACL("SA"),
TEST_OP("TS"),
WRITE("WR");
WRITE("WR"),
INIT("IN");

private final String opCode;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@
public final class FileSystemConfigurations {

public static final String DEFAULT_FS_AZURE_ACCOUNT_IS_HNS_ENABLED = "";
public static final String DEFAULT_FS_AZURE_FNS_ACCOUNT_SERVICE_TYPE = "DFS";
public static final String DEFAULT_FS_AZURE_INGRESS_SERVICE_TYPE = "DFS";
public static final boolean DEFAULT_FS_AZURE_ENABLE_DFSTOBLOB_FALLBACK = false;
public static final boolean DEFAULT_FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED = true;
public static final String USER_HOME_DIRECTORY_PREFIX = "/user";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,8 @@ public final class FileSystemUriSchemes {
public static final String WASB_SECURE_SCHEME = "wasbs";
public static final String WASB_DNS_PREFIX = "blob";

public static final String ABFS_DFS_DOMAIN_NAME = "dfs.core.windows.net";
public static final String ABFS_BLOB_DOMAIN_NAME = "blob.core.windows.net";

private FileSystemUriSchemes() {}
}

This file was deleted.

Loading