Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,10 @@ public class AbfsConfiguration{
FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION, DefaultValue = DEFAULT_ENABLE_ABFS_CHECKSUM_VALIDATION)
private boolean isChecksumValidationEnabled;

@BooleanConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_ENABLE_PAGINATED_DELETE, DefaultValue = DEFAULT_ENABLE_PAGINATED_DELETE)
private boolean isPaginatedDeleteEnabled;

private String clientProvidedEncryptionKey;
private String clientProvidedEncryptionKeySHA;

Expand Down Expand Up @@ -1240,8 +1244,8 @@ public boolean getRenameResilience() {
return renameResilience;
}

void setRenameResilience(boolean actualResilience) {
renameResilience = actualResilience;
public boolean isPaginatedDeleteEnabled() {
return isPaginatedDeleteEnabled;
}

public boolean getIsChecksumValidationEnabled() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1077,8 +1077,8 @@ public void delete(final Path path, final boolean recursive,

do {
try (AbfsPerfInfo perfInfo = startTracking("delete", "deletePath")) {
AbfsRestOperation op = client
.deletePath(relativePath, recursive, continuation, tracingContext);
AbfsRestOperation op = client.deletePath(relativePath, recursive,
continuation, tracingContext, getIsNamespaceEnabled(tracingContext));
perfInfo.registerResult(op.getResult());
continuation = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION);
perfInfo.registerSuccess(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,37 @@ public final class AbfsHttpConstants {
public static final char CHAR_EQUALS = '=';
public static final char CHAR_STAR = '*';
public static final char CHAR_PLUS = '+';
public static final String DECEMBER_2019_API_VERSION = "2019-12-12";
public static final String APRIL_2021_API_VERSION = "2021-04-10";

/**
* Specifies the version of the REST protocol used for processing the request.
* Versions should be added in enum list in ascending chronological order.
* Latest one should be added last in the list.
* When upgrading the version for whole driver, update the getCurrentVersion;
*/
public enum ApiVersion {

DEC_12_2019("2019-12-12"),
APR_10_2021("2021-04-10"),
AUG_03_2023("2023-08-03");

private final String xMsApiVersion;

ApiVersion(String xMsApiVersion) {
this.xMsApiVersion = xMsApiVersion;
}

@Override
public String toString() {
return xMsApiVersion;
}

public static ApiVersion getCurrentVersion() {
return DEC_12_2019;
}
}

@Deprecated
public static final String DECEMBER_2019_API_VERSION = ApiVersion.DEC_12_2019.toString();

/**
* Value that differentiates categories of the http_status.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,11 @@ public final class ConfigurationKeys {
/** Add extra resilience to rename failures, at the expense of performance. */
public static final String FS_AZURE_ABFS_RENAME_RESILIENCE = "fs.azure.enable.rename.resilience";

/**
* Specify whether paginated behavior is to be expected or not in delete path. {@value}
*/
public static final String FS_AZURE_ENABLE_PAGINATED_DELETE = "fs.azure.enable.paginated.delete";

/** Add extra layer of verification of the integrity of the request content during transport: {@value}. */
public static final String FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION = "fs.azure.enable.checksum.validation";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ public final class FileSystemConfigurations {
public static final int STREAM_ID_LEN = 12;
public static final boolean DEFAULT_ENABLE_ABFS_LIST_ITERATOR = true;
public static final boolean DEFAULT_ENABLE_ABFS_RENAME_RESILIENCE = true;
public static final boolean DEFAULT_ENABLE_PAGINATED_DELETE = false;
public static final boolean DEFAULT_ENABLE_ABFS_CHECKSUM_VALIDATION = false;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public final class HttpQueryParams {
public static final String QUERY_PARAM_CLOSE = "close";
public static final String QUERY_PARAM_UPN = "upn";
public static final String QUERY_PARAM_BLOBTYPE = "blobtype";
public static final String QUERY_PARAM_PAGINATED = "paginated";

//query params for SAS
public static final String QUERY_PARAM_SAOID = "saoid";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public class AbfsClient implements Closeable {

private final URL baseUrl;
private final SharedKeyCredentials sharedKeyCredentials;
private String xMsVersion = DECEMBER_2019_API_VERSION;
private ApiVersion xMsVersion = ApiVersion.getCurrentVersion();
private final ExponentialRetryPolicy exponentialRetryPolicy;
private final StaticRetryPolicy staticRetryPolicy;
private final String filesystem;
Expand All @@ -122,7 +122,6 @@ public class AbfsClient implements Closeable {
private final ListeningScheduledExecutorService executorService;
private Boolean isNamespaceEnabled;


private boolean renameResilience;

/**
Expand All @@ -149,7 +148,7 @@ private AbfsClient(final URL baseUrl,

if (encryptionContextProvider != null) {
this.encryptionContextProvider = encryptionContextProvider;
xMsVersion = APRIL_2021_API_VERSION; // will be default once server change deployed
xMsVersion = ApiVersion.APR_10_2021; // will be default once server change deployed
encryptionType = EncryptionType.ENCRYPTION_CONTEXT;
} else if (abfsConfiguration.getEncodedClientProvidedEncryptionKey() != null) {
clientProvidedEncryptionKey =
Expand Down Expand Up @@ -259,13 +258,27 @@ AbfsThrottlingIntercept getIntercept() {
return intercept;
}

List<AbfsHttpHeader> createDefaultHeaders() {
/**
* Create request headers for Rest Operation using the current API version.
* @return default request headers
*/
@VisibleForTesting
protected List<AbfsHttpHeader> createDefaultHeaders() {
return createDefaultHeaders(this.xMsVersion);
}

/**
* Create request headers for Rest Operation using the specified API version.
* @param xMsVersion
* @return default request headers
*/
private List<AbfsHttpHeader> createDefaultHeaders(ApiVersion xMsVersion) {
final List<AbfsHttpHeader> requestHeaders = new ArrayList<AbfsHttpHeader>();
requestHeaders.add(new AbfsHttpHeader(X_MS_VERSION, xMsVersion));
requestHeaders.add(new AbfsHttpHeader(X_MS_VERSION, xMsVersion.toString()));
requestHeaders.add(new AbfsHttpHeader(ACCEPT, APPLICATION_JSON
+ COMMA + SINGLE_WHITE_SPACE + APPLICATION_OCTET_STREAM));
+ COMMA + SINGLE_WHITE_SPACE + APPLICATION_OCTET_STREAM));
requestHeaders.add(new AbfsHttpHeader(ACCEPT_CHARSET,
UTF_8));
UTF_8));
requestHeaders.add(new AbfsHttpHeader(CONTENT_TYPE, EMPTY_STRING));
requestHeaders.add(new AbfsHttpHeader(USER_AGENT, userAgent));
return requestHeaders;
Expand Down Expand Up @@ -1117,12 +1130,29 @@ public AbfsRestOperation read(final String path,
return op;
}

public AbfsRestOperation deletePath(final String path, final boolean recursive, final String continuation,
TracingContext tracingContext)
public AbfsRestOperation deletePath(final String path, final boolean recursive,
final String continuation,
TracingContext tracingContext,
final boolean isNamespaceEnabled)
throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();

/*
* If Pagination is enabled and current API version is old,
* use the minimum required version for pagination.
* If Pagination is enabled and current API version is later than minimum required
* version for pagination, use current version only as azure service is backward compatible.
* If pagination is disabled, use the current API version only.
*/
final List<AbfsHttpHeader> requestHeaders = (isPaginatedDelete(recursive,
isNamespaceEnabled) && xMsVersion.compareTo(ApiVersion.AUG_03_2023) < 0)
? createDefaultHeaders(ApiVersion.AUG_03_2023)
: createDefaultHeaders();
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();

if (isPaginatedDelete(recursive, isNamespaceEnabled)) {
// Add paginated query parameter
abfsUriQueryBuilder.addQuery(QUERY_PARAM_PAGINATED, TRUE);
}

abfsUriQueryBuilder.addQuery(QUERY_PARAM_RECURSIVE, String.valueOf(recursive));
abfsUriQueryBuilder.addQuery(QUERY_PARAM_CONTINUATION, continuation);
String operation = recursive ? SASTokenProvider.DELETE_RECURSIVE_OPERATION : SASTokenProvider.DELETE_OPERATION;
Expand Down Expand Up @@ -1465,6 +1495,14 @@ private synchronized Boolean getIsNamespaceEnabled(TracingContext tracingContext
return isNamespaceEnabled;
}

protected Boolean getIsPaginatedDeleteEnabled() {
return abfsConfiguration.isPaginatedDeleteEnabled();
}

private Boolean isPaginatedDelete(boolean isRecursiveDelete, boolean isNamespaceEnabled) {
return getIsPaginatedDeleteEnabled() && isNamespaceEnabled && isRecursiveDelete;
}

public AuthType getAuthType() {
return authType;
}
Expand Down Expand Up @@ -1659,7 +1697,7 @@ protected AbfsCounters getAbfsCounters() {
return abfsCounters;
}

public String getxMsVersion() {
public ApiVersion getxMsVersion() {
return xMsVersion;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,8 +323,9 @@ private AbfsRestOperation callOperation(AzureBlobFileSystem fs,
return client.renamePath(path, new Path(path + "_2").toString(),
null, tc, null, false, fs.getIsNamespaceEnabled(tc)).getOp();
case DELETE:
TracingContext testTC = getTestTracingContext(fs, false);
return client.deletePath(path, false, null,
getTestTracingContext(fs, false));
testTC, fs.getIsNamespaceEnabled(testTC));
case GET_ATTR:
return client.getPathStatus(path, true,
getTestTracingContext(fs, false),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,8 @@ public void testDeleteIdempotencyTriggerHttp404() throws Exception {
"/NonExistingPath",
false,
null,
getTestTracingContext(fs, true)));
getTestTracingContext(fs, true),
fs.getIsNamespaceEnabled(getTestTracingContext(fs, true))));

// mock idempotency check to mimic retried case
AbfsClient mockClient = ITestAbfsClient.getMockAbfsClient(
Expand All @@ -269,14 +270,15 @@ public void testDeleteIdempotencyTriggerHttp404() throws Exception {
doReturn(idempotencyRetOp).when(mockClient).deleteIdempotencyCheckOp(any());
TracingContext tracingContext = getTestTracingContext(fs, false);
doReturn(tracingContext).when(idempotencyRetOp).createNewTracingContext(any());
when(mockClient.deletePath("/NonExistingPath", false, null, tracingContext))
when(mockClient.deletePath("/NonExistingPath", false, null,
tracingContext, fs.getIsNamespaceEnabled(tracingContext)))
.thenCallRealMethod();

Assertions.assertThat(mockClient.deletePath(
"/NonExistingPath",
false,
null,
tracingContext)
tracingContext, fs.getIsNamespaceEnabled(tracingContext))
.getResult()
.getStatusCode())
.describedAs("Idempotency check reports successful "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.hadoop.fs.azurebfs.services;

import java.util.List;

import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider;

public final class AbfsClientUtils {
Expand All @@ -31,4 +33,13 @@ public static void setIsNamespaceEnabled(final AbfsClient abfsClient, final Bool
public static void setEncryptionContextProvider(final AbfsClient abfsClient, final EncryptionContextProvider provider) {
abfsClient.setEncryptionContextProvider(provider);
}

public static String getHeaderValue(List<AbfsHttpHeader> reqHeaders, String headerName) {
for (AbfsHttpHeader header : reqHeaders) {
if (header.getName().equals(headerName)) {
return header.getValue();
}
}
return "";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ public static AbfsClient getMockAbfsClient(AbfsClient baseAbfsClientInstance,
return client;
}

private static AbfsClient setAbfsClientField(
static AbfsClient setAbfsClientField(
final AbfsClient client,
final String fieldName,
Object fieldObject) throws Exception {
Expand Down
Loading