Skip to content
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
e2ffb8c
ABFS: Added changes for expect hundred continue header with append re…
anmolasrani123 Feb 28, 2022
d110408
ABFS: Added changes for expect hundred continue
anmolasrani123 Mar 2, 2022
a29faa8
ABFS: Added changes for expect hundred continue
anmolasrani123 Mar 2, 2022
dacfde0
ABFS: Added changes for expect hundred continue
anmolasrani123 Mar 2, 2022
c14f458
Added retry mechanism for certain HTTP errors
anmolasrani123 Mar 4, 2022
7e40f07
Added retry mechanism for certain HTTP errors
anmolasrani123 Mar 4, 2022
ef67598
Merge branch 'apache:trunk' into HADOOP-18146
anmolanmol1234 Mar 4, 2022
93a77a7
Added retry mechanism for certain HTTP errors
anmolasrani123 Mar 30, 2022
9b43316
ABFS: Added changes for expect hundred continue header with append re…
anmolasrani123 Feb 28, 2022
899b40b
ABFS: Added changes for expect hundred continue
anmolasrani123 Mar 2, 2022
2af317d
ABFS: Added changes for expect hundred continue
anmolasrani123 Mar 2, 2022
cc9fcdb
ABFS: Added changes for expect hundred continue
anmolasrani123 Mar 2, 2022
9fc9c99
Added retry mechanism for certain HTTP errors
anmolasrani123 Mar 4, 2022
56eda26
Added retry mechanism for certain HTTP errors
anmolasrani123 Mar 4, 2022
091f1d4
Added retry mechanism for certain HTTP errors
anmolasrani123 Mar 30, 2022
3709619
Fix trunk conflict
anmolasrani123 May 12, 2022
fe33f93
Merge branch 'HADOOP-18146' of https://github.com/anmolanmol1234/hado…
anmolasrani123 Jul 20, 2022
94397c7
Added config details in md file
anmolasrani123 Jul 20, 2022
9c8f7d0
Merge branch 'trunk' into HADOOP-18146
anmolanmol1234 Jul 20, 2022
5f26061
Changing class modifier
anmolasrani123 Jul 20, 2022
f2ffb23
Merge branch 'HADOOP-18146' of https://github.com/anmolanmol1234/hado…
anmolasrani123 Jul 20, 2022
0f18d94
Spot bugs and checkstyle fixes
anmolasrani123 Aug 2, 2022
83fbd8c
Merge branch 'apache:trunk' into HADOOP-18146
anmolanmol1234 Aug 2, 2022
58c1123
remove unused imports
anmolasrani123 Aug 2, 2022
aab3128
Fix imports
anmolasrani123 Aug 22, 2022
d13f8bd
Merge branch 'apache:trunk' into HADOOP-18146
anmolanmol1234 Sep 19, 2022
478021a
Separate out account throttling
anmolasrani123 Sep 19, 2022
9fbb4de
Documentation added
anmolasrani123 Oct 25, 2022
7c43202
Formatting
anmolasrani123 Oct 25, 2022
75a3332
Merge branch 'apache:trunk' into HADOOP-18146
anmolanmol1234 Oct 25, 2022
3fc18b9
Addressed PR comments
anmolasrani123 Oct 28, 2022
aaca1c1
Addressed PR comments
anmolasrani123 Oct 28, 2022
135e04f
Addressed PR comments
anmolasrani123 Oct 28, 2022
eaf9dfc
Merge branch 'apache:trunk' into HADOOP-18146
anmolanmol1234 Nov 29, 2022
2a1834f
Addressed PR comments
anmolasrani123 Nov 29, 2022
3f37058
Merge branch 'HADOOP-18146' of https://github.com/anmolanmol1234/hado…
anmolasrani123 Nov 29, 2022
443e263
Fix for exception
anmolasrani123 Nov 29, 2022
e694264
Merge branch 'trunk' into HADOOP-18146
anmolanmol1234 Dec 5, 2022
7961d08
Merge branch 'apache:trunk' into HADOOP-18146
anmolanmol1234 Dec 5, 2022
05df41c
Update AbfsConfiguration.java
anmolanmol1234 Dec 5, 2022
457fda0
Changes for exception handling
anmolasrani123 Dec 6, 2022
f2e6f52
String correction
anmolasrani123 Dec 7, 2022
fd006bd
Tests for hundred continue
anmolasrani123 Dec 15, 2022
baf9ec7
Add tests for 100 continue
anmolasrani123 Dec 15, 2022
fe8deea
Add tests for hundred continue
anmolasrani123 Dec 15, 2022
cafd409
Parameters for test
anmolasrani123 Dec 19, 2022
f17c15a
Tests for expect header
anmolasrani123 Dec 19, 2022
61138e9
Update metrics fix
anmolasrani123 Dec 20, 2022
ac3e973
Metric update changes
anmolasrani123 Dec 20, 2022
36ec260
Tests for metric updation verification
anmolasrani123 Dec 20, 2022
8283ef2
Update md file
anmolasrani123 Dec 20, 2022
9611999
Remove unused imports
anmolasrani123 Dec 20, 2022
93003b2
Merge branch 'apache:trunk' into HADOOP-18146
anmolanmol1234 Dec 20, 2022
7899d1c
Checkstyle fixes
anmolasrani123 Dec 20, 2022
5398f2a
Checkstyle fixes
anmolasrani123 Dec 21, 2022
796b774
PR comments addressing
anmolasrani123 Dec 22, 2022
115b0b6
PR comments
anmolasrani123 Dec 22, 2022
db89c78
remove stter for connection
anmolasrani123 Dec 23, 2022
0fb9067
Update AbfsClient.java
anmolanmol1234 Dec 26, 2022
f3e2e14
Addressing PR comments
anmolanmol1234 Mar 16, 2023
51bdece
String fix
anmolanmol1234 Mar 16, 2023
83f14fb
Merge branch 'apache:trunk' into HADOOP-18146
anmolanmol1234 Mar 16, 2023
c3268dc
Remove unused imports
anmolanmol1234 Mar 16, 2023
675687c
Import fix
anmolanmol1234 Mar 16, 2023
99a9377
Checkstyle fixes
anmolanmol1234 Mar 16, 2023
e210b04
Build fixed
anmolanmol1234 Mar 17, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,6 @@
<!-- allow tests to use _ for ordering. -->
<suppress checks="MethodName"
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]commit[\\/]ITestAbfsTerasort.java"/>
<suppress checks="ParameterNumber"
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]services[\\/]TestAbfsOutputStream.java"/>
</suppressions>
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,11 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_OPTIMIZE_FOOTER_READ)
private boolean optimizeFooterRead;

@BooleanConfigurationValidatorAnnotation(
ConfigurationKey = FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED,
DefaultValue = DEFAULT_FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED)
private boolean isExpectHeaderEnabled;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_READ_BUFFER_SIZE,
MinValue = MIN_BUFFER_SIZE,
MaxValue = MAX_BUFFER_SIZE,
Expand Down Expand Up @@ -689,6 +694,10 @@ public String getAppendBlobDirs() {
return this.azureAppendBlobDirs;
}

public boolean isExpectHeaderEnabled() {
return this.isExpectHeaderEnabled;
}

public String getAzureInfiniteLeaseDirs() {
return this.azureInfiniteLeaseDirs;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,7 @@ private AbfsOutputStreamContext populateAbfsOutputStreamContext(
return new AbfsOutputStreamContext(abfsConfiguration.getSasTokenRenewPeriodForStreamsInSeconds())
.withWriteBufferSize(bufferSize)
.enableFlush(abfsConfiguration.isFlushEnabled())
.enableExpectHeader(abfsConfiguration.isExpectHeaderEnabled())
.enableSmallWriteOptimization(abfsConfiguration.isSmallWriteOptimizationEnabled())
.disableOutputStreamFlush(abfsConfiguration.isOutputStreamFlushDisabled())
.withStreamStatistics(new AbfsOutputStreamStatisticsImpl())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public final class AbfsHttpConstants {
public static final String DEFAULT_SCOPE = "default:";
public static final String PERMISSION_FORMAT = "%04d";
public static final String SUPER_USER = "$superuser";
public static final String HUNDRED_CONTINUE = "100-continue";

public static final char CHAR_FORWARD_SLASH = '/';
public static final char CHAR_EXCLAMATION_POINT = '!';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public final class ConfigurationKeys {
* path to determine HNS status.
*/
public static final String FS_AZURE_ACCOUNT_IS_HNS_ENABLED = "fs.azure.account.hns.enabled";
public static final String FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED = "fs.azure.account.expect.header.enabled";
public static final String FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME = "fs.azure.account.key";
public static final String FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME_REGX = "fs\\.azure\\.account\\.key\\.(.*)";
public static final String FS_AZURE_SECURE_MODE = "fs.azure.secure.mode";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
public final class FileSystemConfigurations {

public static final String DEFAULT_FS_AZURE_ACCOUNT_IS_HNS_ENABLED = "";

public static final boolean DEFAULT_FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED = true;
public static final String USER_HOME_DIRECTORY_PREFIX = "/user";

private static final int SIXTY_SECONDS = 60 * 1000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public final class HttpHeaderConfigurations {
public static final String X_MS_UMASK = "x-ms-umask";
public static final String X_MS_NAMESPACE_ENABLED = "x-ms-namespace-enabled";
public static final String X_MS_ABFS_CLIENT_LATENCY = "x-ms-abfs-client-latency";
public static final String EXPECT = "Expect";
public static final String X_MS_ENCRYPTION_KEY = "x-ms-encryption-key";
public static final String X_MS_ENCRYPTION_KEY_SHA256 = "x-ms-encryption-key-sha256";
public static final String X_MS_ENCRYPTION_ALGORITHM = "x-ms-encryption-algorithm";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,24 @@
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class InvalidAbfsRestOperationException extends AbfsRestOperationException {
public InvalidAbfsRestOperationException(
final Exception innerException) {
super(
AzureServiceErrorCode.UNKNOWN.getStatusCode(),
AzureServiceErrorCode.UNKNOWN.getErrorCode(),
innerException != null
? innerException.toString()
: "InvalidAbfsRestOperationException",
innerException);
}
public InvalidAbfsRestOperationException(
final Exception innerException) {
super(
AzureServiceErrorCode.UNKNOWN.getStatusCode(),
AzureServiceErrorCode.UNKNOWN.getErrorCode(),
innerException != null
? innerException.toString()
: "InvalidAbfsRestOperationException",
innerException);
}

public InvalidAbfsRestOperationException(final Exception innerException, int retryCount) {
super(
AzureServiceErrorCode.UNKNOWN.getStatusCode(),
AzureServiceErrorCode.UNKNOWN.getErrorCode(),
innerException != null
? innerException.toString()
: "InvalidAbfsRestOperationException" + "RetryCount: " + String.valueOf(retryCount),
innerException);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,21 @@ public enum Mode {
private final Mode mode;
private final boolean isAppendBlob;
private final String leaseId;
private boolean isExpectHeaderEnabled;

public AppendRequestParameters(final long position,
final int offset,
final int length,
final Mode mode,
final boolean isAppendBlob,
final String leaseId) {
final String leaseId, boolean isExpectHeaderEnabled) {
this.position = position;
this.offset = offset;
this.length = length;
this.mode = mode;
this.isAppendBlob = isAppendBlob;
this.leaseId = leaseId;
this.isExpectHeaderEnabled = isExpectHeaderEnabled;
}

public long getPosition() {
Expand All @@ -72,4 +74,12 @@ public boolean isAppendBlob() {
public String getLeaseId() {
return this.leaseId;
}

public boolean getIsExpectHeaderEnabled() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change to isExpectHeaderEnabled() for consistency with the AbfsOutputStreamContext property

return isExpectHeaderEnabled;
}

public void setExpectHeaderEnabled(boolean expectHeaderEnabled) {
isExpectHeaderEnabled = expectHeaderEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.store.LogExactlyOnce;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
Expand Down Expand Up @@ -652,6 +653,9 @@ public AbfsRestOperation append(final String path, final byte[] buffer,
addCustomerProvidedKeyHeaders(requestHeaders);
// JDK7 does not support PATCH, so to workaround the issue we will use
// PUT and specify the real method in the X-Http-Method-Override header.
if (reqParams.getIsExpectHeaderEnabled()) {
requestHeaders.add(new AbfsHttpHeader(EXPECT, HUNDRED_CONTINUE));
}
requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE,
HTTP_METHOD_PATCH));
if (reqParams.getLeaseId() != null) {
Expand Down Expand Up @@ -688,6 +692,17 @@ public AbfsRestOperation append(final String path, final byte[] buffer,
try {
op.execute(tracingContext);
} catch (AzureBlobFileSystemException e) {
/*
If the http response code indicates a user error we retry the same append request with expect header disabled.
When "100-continue" header is enabled but a non Http 100 response comes, JDK fails to provide all response headers.
This handling is to avoid breaking of backward compatibility if someone has taken dependency on the exception message,
which is created using the error string present in the response header.
*/
if (checkUserError(e) && reqParams.getIsExpectHeaderEnabled()) {
reqParams.setExpectHeaderEnabled(false);
return this.append(path, buffer, reqParams, cachedSasToken,
tracingContext);
}
// If we have no HTTP response, throw the original exception.
if (!op.hasResult()) {
throw e;
Expand All @@ -714,6 +729,18 @@ && appendSuccessCheckOp(op, path,
return op;
}

/**
* Returns true if the status code lies in the range of user error
* @param e Exception caught
* @return True or False
*/
private boolean checkUserError(AzureBlobFileSystemException e) {
return ((AbfsRestOperationException) e).getStatusCode()
>= HttpURLConnection.HTTP_BAD_REQUEST
&& ((AbfsRestOperationException) e).getStatusCode()
< HttpURLConnection.HTTP_INTERNAL_ERROR;
}

// For AppendBlob its possible that the append succeeded in the backend but the request failed.
// However a retry would fail with an InvalidQueryParameterValue
// (as the current offset would be unacceptable).
Expand Down Expand Up @@ -1273,6 +1300,14 @@ protected AbfsCounters getAbfsCounters() {
return abfsCounters;
}

/**
* Getter for abfsConfiguration from AbfsClient.
* @return AbfsConfiguration instance
*/
protected AbfsConfiguration getAbfsConfiguration() {
return abfsConfiguration;
}

public int getNumLeaseThreads() {
return abfsConfiguration.getNumLeaseThreads();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,8 @@ public void sendRequest(byte[] buffer, int offset, int length) throws IOExceptio
// accompanying statusCode
this.bytesSent = length;
outputStream.write(buffer, offset, length);
} catch (IOException e) {
this.bytesSent = length;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is going to swallow all IOEs raised. unless i've misreasd something, the IOE must be rethrown

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and it will double count if write() raises an IOE

} finally {
if (this.isTraceEnabled) {
this.sendRequestTimeMs = elapsedTimeMs(startTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
private boolean disableOutputStreamFlush;
private boolean enableSmallWriteOptimization;
private boolean isAppendBlob;
private boolean isExpectHeaderEnabled;
private volatile IOException lastError;

private long lastFlushOffset;
Expand Down Expand Up @@ -133,6 +134,7 @@ public AbfsOutputStream(AbfsOutputStreamContext abfsOutputStreamContext)
this.position = abfsOutputStreamContext.getPosition();
this.closed = false;
this.supportFlush = abfsOutputStreamContext.isEnableFlush();
this.isExpectHeaderEnabled = abfsOutputStreamContext.isExpectHeaderEnabled();
this.disableOutputStreamFlush = abfsOutputStreamContext
.isDisableOutputStreamFlush();
this.enableSmallWriteOptimization
Expand Down Expand Up @@ -327,7 +329,7 @@ private void uploadBlockAsync(DataBlocks.DataBlock blockToUpload,
* leaseId - The AbfsLeaseId for this request.
*/
AppendRequestParameters reqParams = new AppendRequestParameters(
offset, 0, bytesLength, mode, false, leaseId);
offset, 0, bytesLength, mode, false, leaseId, isExpectHeaderEnabled);
AbfsRestOperation op =
client.append(path, blockUploadData.toByteArray(), reqParams,
cachedSasToken.get(), new TracingContext(tracingContext));
Expand Down Expand Up @@ -573,7 +575,7 @@ private void writeAppendBlobCurrentBufferToService() throws IOException {
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
"writeCurrentBufferToService", "append")) {
AppendRequestParameters reqParams = new AppendRequestParameters(offset, 0,
bytesLength, APPEND_MODE, true, leaseId);
bytesLength, APPEND_MODE, true, leaseId, isExpectHeaderEnabled);
AbfsRestOperation op = client.append(path, uploadData.toByteArray(), reqParams,
cachedSasToken.get(), new TracingContext(tracingContext));
cachedSasToken.update(op.getSasToken());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public class AbfsOutputStreamContext extends AbfsStreamContext {

private boolean enableFlush;

private boolean enableExpectHeader;

private boolean enableSmallWriteOptimization;

private boolean disableOutputStreamFlush;
Expand Down Expand Up @@ -78,6 +80,11 @@ public AbfsOutputStreamContext enableFlush(final boolean enableFlush) {
return this;
}

public AbfsOutputStreamContext enableExpectHeader(final boolean enableExpectHeader) {
this.enableExpectHeader = enableExpectHeader;
return this;
}

public AbfsOutputStreamContext enableSmallWriteOptimization(final boolean enableSmallWriteOptimization) {
this.enableSmallWriteOptimization = enableSmallWriteOptimization;
return this;
Expand Down Expand Up @@ -184,6 +191,10 @@ public boolean isEnableFlush() {
return enableFlush;
}

public boolean isExpectHeaderEnabled() {
return enableExpectHeader;
}

public boolean isDisableOutputStreamFlush() {
return disableOutputStreamFlush;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ private boolean executeHttpOperation(final int retryCount,
LOG.warn("Unknown host name: {}. Retrying to resolve the host name...",
hostname);
if (!client.getRetryPolicy().shouldRetry(retryCount, -1)) {
throw new InvalidAbfsRestOperationException(ex);
throw new InvalidAbfsRestOperationException(ex, retryCount);
}
return false;
} catch (IOException ex) {
Expand All @@ -312,7 +312,7 @@ private boolean executeHttpOperation(final int retryCount,
}

if (!client.getRetryPolicy().shouldRetry(retryCount, -1)) {
throw new InvalidAbfsRestOperationException(ex);
throw new InvalidAbfsRestOperationException(ex, retryCount);
}

return false;
Expand Down
6 changes: 6 additions & 0 deletions hadoop-tools/hadoop-azure/src/site/markdown/abfs.md
Original file line number Diff line number Diff line change
Expand Up @@ -767,6 +767,12 @@ Hflush() being the only documented API that can provide persistent data
transfer, Flush() also attempting to persist buffered data will lead to
performance issues.

### <a name="100continueconfigoptions"></a> Hundred Continue Options

`fs.azure.account.expect.header.enabled`: This configuration parameter is used
to specify whether you wish to send a expect 100 continue header with each
append request or not. It is configured to true by default.

### <a name="hnscheckconfigoptions"></a> HNS Check Options
Config `fs.azure.account.hns.enabled` provides an option to specify whether
the storage account is HNS enabled or not. In case the config is not provided,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ public void testAppendWithCPK() throws Exception {
// Trying to append with correct CPK headers
AppendRequestParameters appendRequestParameters =
new AppendRequestParameters(
0, 0, 5, Mode.APPEND_MODE, false, null);
0, 0, 5, Mode.APPEND_MODE, false, null, true);
byte[] buffer = getRandomBytesArray(5);
AbfsClient abfsClient = fs.getAbfsClient();
AbfsRestOperation abfsRestOperation = abfsClient
Expand Down Expand Up @@ -248,7 +248,7 @@ public void testAppendWithoutCPK() throws Exception {
// Trying to append without CPK headers
AppendRequestParameters appendRequestParameters =
new AppendRequestParameters(
0, 0, 5, Mode.APPEND_MODE, false, null);
0, 0, 5, Mode.APPEND_MODE, false, null, true);
byte[] buffer = getRandomBytesArray(5);
AbfsClient abfsClient = fs.getAbfsClient();
AbfsRestOperation abfsRestOperation = abfsClient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ public static AbfsClient getMockAbfsClient(AbfsClient baseAbfsClientInstance,
when(client.getAccessToken()).thenCallRealMethod();
when(client.getSharedKeyCredentials()).thenCallRealMethod();
when(client.createDefaultHeaders()).thenCallRealMethod();

when(client.getAbfsConfiguration()).thenReturn(abfsConfig);
// override baseurl
client = TestAbfsClient.setAbfsClientField(client, "abfsConfiguration",
abfsConfig);
Expand Down
Loading