diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index 80f803d80dab0..1cc7e5ec7e19e 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -328,6 +328,10 @@ public class AbfsConfiguration{ FS_AZURE_ENABLE_ABFS_LIST_ITERATOR, DefaultValue = DEFAULT_ENABLE_ABFS_LIST_ITERATOR) private boolean enableAbfsListIterator; + @BooleanConfigurationValidatorAnnotation(ConfigurationKey = + FS_AZURE_ABFS_RENAME_RESILIENCE, DefaultValue = DEFAULT_ENABLE_ABFS_RENAME_RESILIENCE) + private boolean renameResilience; + public AbfsConfiguration(final Configuration rawConfig, String accountName) throws IllegalAccessException, InvalidConfigurationValueException, IOException { this.rawConfig = ProviderUtils.excludeIncompatibleCredentialProviders( @@ -1130,4 +1134,7 @@ public void setEnableAbfsListIterator(boolean enableAbfsListIterator) { this.enableAbfsListIterator = enableAbfsListIterator; } + public boolean getRenameResilience() { + return renameResilience; + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index a59f76b6d0fe0..9b1ff0d9ef1fe 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -233,6 +233,9 @@ public final class ConfigurationKeys { /** Key for rate limit capacity, as used by IO operations which try to throttle themselves. */ public static final String FS_AZURE_ABFS_IO_RATE_LIMIT = "fs.azure.io.rate.limit"; + /** Add extra resilience to rename failures, at the expense of performance. */ + public static final String FS_AZURE_ABFS_RENAME_RESILIENCE = "fs.azure.enable.rename.resilience"; + public static String accountProperty(String property, String account) { return property + "." + account; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index 9994d9f5207f3..fcbeb1ab5387e 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -118,6 +118,7 @@ public final class FileSystemConfigurations { public static final int STREAM_ID_LEN = 12; public static final boolean DEFAULT_ENABLE_ABFS_LIST_ITERATOR = true; + public static final boolean DEFAULT_ENABLE_ABFS_RENAME_RESILIENCE = true; /** * Limit of queued block upload operations before writes diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index 25562660ae231..582e4969a60db 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -38,6 +38,7 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.store.LogExactlyOnce; import org.apache.hadoop.util.Preconditions; import org.apache.hadoop.thirdparty.com.google.common.base.Strings; @@ -68,6 +69,7 @@ import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory; import org.apache.hadoop.util.concurrent.HadoopExecutors; +import static org.apache.commons.lang3.StringUtils.isEmpty; import static org.apache.commons.lang3.StringUtils.isNotEmpty; import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.RENAME_PATH_ATTEMPTS; import static org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.extractEtagHeader; @@ -77,7 +79,9 @@ import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.HTTPS_SCHEME; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.*; import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.*; +import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.PATH_NOT_FOUND; import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND; +import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND; /** * AbfsClient. @@ -105,6 +109,11 @@ public class AbfsClient implements Closeable { private final ListeningScheduledExecutorService executorService; + /** + * Enable resilient rename. + */ + private final boolean renameResilience; + /** logging the rename failure if metadata is in an incomplete state. */ private static final LogExactlyOnce ABFS_METADATA_INCOMPLETE_RENAME_FAILURE = new LogExactlyOnce(LOG); @@ -157,6 +166,9 @@ private AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCreden new ThreadFactoryBuilder().setNameFormat("AbfsClient Lease Ops").setDaemon(true).build(); this.executorService = MoreExecutors.listeningDecorator( HadoopExecutors.newScheduledThreadPool(this.abfsConfiguration.getNumLeaseThreads(), tf)); + // rename resilience + renameResilience = abfsConfiguration.getRenameResilience(); + LOG.debug("Rename resilience is {}",renameResilience); } public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials, @@ -519,11 +531,38 @@ public AbfsClientRenameResult renamePath( final String destination, final String continuation, final TracingContext tracingContext, - final String sourceEtag, + String sourceEtag, boolean isMetadataIncompleteState) throws AzureBlobFileSystemException { final List requestHeaders = createDefaultHeaders(); + // etag passed in, so source is a file + final boolean hasEtag = !isEmpty(sourceEtag); + boolean isDir = !hasEtag; + if (!hasEtag && renameResilience) { + // no etag was passed in and rename resilience is enabled, so + // get the value + + try { + final AbfsRestOperation srcStatusOp = getPathStatus(source, + false, tracingContext); + final AbfsHttpOperation result = srcStatusOp.getResult(); + + sourceEtag = extractEtagHeader(result); + // and update the directory status. + final String resourceType = + result.getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE); + isDir = AbfsHttpConstants.DIRECTORY.equalsIgnoreCase(resourceType); + LOG.debug("Retrieved etag of source for rename recovery: {}; isDir={}", sourceEtag, isDir); + } catch (AbfsRestOperationException e) { + // switch file not found to source not found + if (PATH_NOT_FOUND.equals(e.getErrorCode())) { + throw new AbfsRestOperationException(e.getStatusCode(), SOURCE_PATH_NOT_FOUND.getErrorCode(), + e.getMessage(), e); + } + } + } + String encodedRenameSource = urlEncode(FORWARD_SLASH + this.getFileSystem() + source); if (authType == AuthType.SAS) { final AbfsUriQueryBuilder srcQueryBuilder = new AbfsUriQueryBuilder(); @@ -540,12 +579,7 @@ public AbfsClientRenameResult renamePath( appendSASTokenToQuery(destination, SASTokenProvider.RENAME_DESTINATION_OPERATION, abfsUriQueryBuilder); final URL url = createRequestUrl(destination, abfsUriQueryBuilder.toString()); - final AbfsRestOperation op = new AbfsRestOperation( - AbfsRestOperationType.RenamePath, - this, - HTTP_METHOD_PUT, - url, - requestHeaders); + final AbfsRestOperation op = createRenameRestOperation(url, requestHeaders); try { incrementAbfsRenamePath(); op.execute(tracingContext); @@ -560,6 +594,7 @@ public AbfsClientRenameResult renamePath( if (!op.hasResult()) { throw e; } + LOG.debug("Rename of {} to {} failed, attempting recovery", source, destination, e); // ref: HADOOP-18242. Rename failure occurring due to a rare case of // tracking metadata being in incomplete state. @@ -588,7 +623,7 @@ public AbfsClientRenameResult renamePath( boolean etagCheckSucceeded = renameIdempotencyCheckOp( source, - sourceEtag, op, destination, tracingContext); + sourceEtag, op, destination, tracingContext, isDir); if (!etagCheckSucceeded) { // idempotency did not return different result // throw back the exception @@ -598,6 +633,17 @@ public AbfsClientRenameResult renamePath( } } + @VisibleForTesting + AbfsRestOperation createRenameRestOperation(URL url, List requestHeaders) { + AbfsRestOperation op = new AbfsRestOperation( + AbfsRestOperationType.RenamePath, + this, + HTTP_METHOD_PUT, + url, + requestHeaders); + return op; + } + private void incrementAbfsRenamePath() { abfsCounters.incrementCounter(RENAME_PATH_ATTEMPTS, 1); } @@ -613,10 +659,11 @@ private void incrementAbfsRenamePath() { * Exceptions raised in the probe of the destination are swallowed, * so that they do not interfere with the original rename failures. * @param source source path + * @param sourceEtag etag of source file. may be null or empty * @param op Rename request REST operation response with non-null HTTP response * @param destination rename destination path - * @param sourceEtag etag of source file. may be null or empty * @param tracingContext Tracks identifiers for request header + * @param isDir is the source a file or directory * @return true if the file was successfully copied */ public boolean renameIdempotencyCheckOp( @@ -624,17 +671,29 @@ public boolean renameIdempotencyCheckOp( final String sourceEtag, final AbfsRestOperation op, final String destination, - TracingContext tracingContext) { + TracingContext tracingContext, + final boolean isDir) { Preconditions.checkArgument(op.hasResult(), "Operations has null HTTP response"); - if ((op.isARetriedRequest()) - && (op.getResult().getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) - && isNotEmpty(sourceEtag)) { + LOG.debug("rename({}, {}) failure {}; retry={} isDir {} etag {}", + source, destination, op.getResult().getStatusCode(), op.isARetriedRequest(), isDir, sourceEtag); + if (!(op.isARetriedRequest() + && (op.getResult().getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND))) { + // only attempt recovery if the failure was a 404 on a retried rename request. + return false; + } - // Server has returned HTTP 404, which means rename source no longer - // exists. Check on destination status and if its etag matches - // that of the source, consider it to be a success. - LOG.debug("rename {} to {} failed, checking etag of destination", + if (isDir) { + // directory recovery is not supported. + // log and fail. + LOG.info("rename directory {} to {} failed; unable to recover", + source, destination); + return false; + } + if (isNotEmpty(sourceEtag)) { + // Server has returned HTTP 404, we have an etag, so see + // if the rename has actually taken place, + LOG.info("rename {} to {} failed, checking etag of destination", source, destination); try { @@ -642,11 +701,18 @@ && isNotEmpty(sourceEtag)) { false, tracingContext); final AbfsHttpOperation result = destStatusOp.getResult(); - return result.getStatusCode() == HttpURLConnection.HTTP_OK + final boolean recovered = result.getStatusCode() == HttpURLConnection.HTTP_OK && sourceEtag.equals(extractEtagHeader(result)); - } catch (AzureBlobFileSystemException ignored) { + LOG.info("File rename has taken place: recovery {}", + recovered ? "succeeded" : "failed"); + return recovered; + } catch (AzureBlobFileSystemException ex) { // GetFileStatus on the destination failed, the rename did not take place + // or some other failure. log and swallow. + LOG.debug("Failed to get status of path {}", destination, ex); } + } else { + LOG.debug("No source etag; unable to probe for the operation's success"); } return false; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientRenameResult.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientRenameResult.java index 86e3473a9fe5d..62e27ffdc056d 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientRenameResult.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientRenameResult.java @@ -58,4 +58,13 @@ public boolean isRenameRecovered() { public boolean isIncompleteMetadataState() { return isIncompleteMetadataState; } + + @Override + public String toString() { + return "AbfsClientRenameResult{" + + "op=" + op + + ", renameRecovered=" + renameRecovered + + ", isIncompleteMetadataState=" + isIncompleteMetadataState + + '}'; + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java index ad99020390a5e..94510fe3f79c6 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java @@ -264,26 +264,7 @@ private boolean executeHttpOperation(final int retryCount, incrementCounter(AbfsStatistic.CONNECTIONS_MADE, 1); tracingContext.constructHeader(httpOperation, failureReason); - switch(client.getAuthType()) { - case Custom: - case OAuth: - LOG.debug("Authenticating request with OAuth2 access token"); - httpOperation.getConnection().setRequestProperty(HttpHeaderConfigurations.AUTHORIZATION, - client.getAccessToken()); - break; - case SAS: - // do nothing; the SAS token should already be appended to the query string - httpOperation.setMaskForSAS(); //mask sig/oid from url for logs - break; - case SharedKey: - // sign the HTTP request - LOG.debug("Signing request with shared key"); - // sign the HTTP request - client.getSharedKeyCredentials().signRequest( - httpOperation.getConnection(), - hasRequestBody ? bufferLength : 0); - break; - } + signRequest(httpOperation, hasRequestBody ? bufferLength : 0); } catch (IOException e) { LOG.debug("Auth failure: {}, {}", method, url); throw new AbfsRestOperationException(-1, null, @@ -351,6 +332,36 @@ private boolean executeHttpOperation(final int retryCount, return true; } + /** + * Sign an operation. + * @param httpOperation operation to sign + * @param bytesToSign how many bytes to sign for shared key auth. + * @throws IOException failure + */ + @VisibleForTesting + public void signRequest(final AbfsHttpOperation httpOperation, int bytesToSign) throws IOException { + switch(client.getAuthType()) { + case Custom: + case OAuth: + LOG.debug("Authenticating request with OAuth2 access token"); + httpOperation.getConnection().setRequestProperty(HttpHeaderConfigurations.AUTHORIZATION, + client.getAccessToken()); + break; + case SAS: + // do nothing; the SAS token should already be appended to the query string + httpOperation.setMaskForSAS(); //mask sig/oid from url for logs + break; + case SharedKey: + // sign the HTTP request + LOG.debug("Signing request with shared key"); + // sign the HTTP request + client.getSharedKeyCredentials().signRequest( + httpOperation.getConnection(), + bytesToSign); + break; + } + } + /** * Creates new object of {@link AbfsHttpOperation} with the url, method, and * requestHeaders fields of the AbfsRestOperation object. diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsContractRenameWithoutResilience.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsContractRenameWithoutResilience.java new file mode 100644 index 0000000000000..600fa1a952c63 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsContractRenameWithoutResilience.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.contract; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys; + +/** + * Contract test for rename operation with rename resilience disabled. + * This is critical to ensure that adding resilience does not cause + * any regressions when disabled. + */ +public class ITestAbfsContractRenameWithoutResilience + extends ITestAbfsFileSystemContractRename { + + public ITestAbfsContractRenameWithoutResilience() throws Exception { + } + + @Override + protected Configuration createConfiguration() { + final Configuration conf = super.createConfiguration(); + conf.setBoolean(ConfigurationKeys.FS_AZURE_ABFS_RENAME_RESILIENCE, false); + return conf; + } + +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java index 65ea79b36bd0e..421ec5ff66280 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java @@ -18,19 +18,39 @@ package org.apache.hadoop.fs.azurebfs.services; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.SocketException; +import java.net.URL; +import java.time.Duration; +import java.util.List; + import org.assertj.core.api.Assertions; +import org.junit.Assume; import org.junit.Test; +import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.fs.EtagSource; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest; import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; +import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore; +import org.apache.hadoop.fs.azurebfs.commit.ResilientCommitByRename; +import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; +import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode; +import org.apache.hadoop.fs.azurebfs.utils.TracingContext; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT; +import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.PATH_ALREADY_EXISTS; import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND; +import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND; import static org.apache.hadoop.test.LambdaTestUtils.intercept; +import static org.mockito.ArgumentMatchers.nullable; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -123,6 +143,257 @@ public void testRenameFailuresDueToIncompleteMetadata() throws Exception { } + AbfsClient getMockAbfsClient() throws IOException { + AzureBlobFileSystem fs = getFileSystem(); + + // adding mock objects to current AbfsClient + AbfsClient spyClient = Mockito.spy(fs.getAbfsStore().getClient()); + + Mockito.doAnswer(answer -> { + AbfsRestOperation op = new AbfsRestOperation(AbfsRestOperationType.RenamePath, + spyClient, HTTP_METHOD_PUT, answer.getArgument(0), answer.getArgument(1)); + AbfsRestOperation spiedOp = Mockito.spy(op); + addSpyBehavior(spiedOp, op, spyClient); + return spiedOp; + }).when(spyClient).createRenameRestOperation(nullable(URL.class), nullable(List.class)); + + return spyClient; + + } + + /** + * Spies on a rest operation to inject transient failure. + * the first createHttpOperation() invocation will return an abfs rest operation + * which will fail. + * @param spiedRestOp spied operation whose createHttpOperation() will fail first time + * @param normalRestOp normal operation the good operation + * @param client client. + * @throws IOException failure + */ + private void addSpyBehavior(final AbfsRestOperation spiedRestOp, + final AbfsRestOperation normalRestOp, + final AbfsClient client) + throws IOException { + AbfsHttpOperation failingOperation = Mockito.spy(normalRestOp.createHttpOperation()); + AbfsHttpOperation normalOp1 = normalRestOp.createHttpOperation(); + executeThenFail(client, normalRestOp, failingOperation, normalOp1); + AbfsHttpOperation normalOp2 = normalRestOp.createHttpOperation(); + normalOp2.getConnection().setRequestProperty(HttpHeaderConfigurations.AUTHORIZATION, + client.getAccessToken()); + + when(spiedRestOp.createHttpOperation()) + .thenReturn(failingOperation) + .thenReturn(normalOp2); + } + + /** + * Mock an idempotency failure by executing the normal operation, then + * raising an IOE. + * @param normalRestOp the rest operation used to sign the requests. + * @param failingOperation failing operation + * @param normalOp good operation + * @throws IOException failure + */ + private void executeThenFail(final AbfsClient client, + final AbfsRestOperation normalRestOp, + final AbfsHttpOperation failingOperation, + final AbfsHttpOperation normalOp) + throws IOException { + Mockito.doAnswer(answer -> { + LOG.info("Executing first attempt with post-operation fault injection"); + final byte[] buffer = answer.getArgument(0); + final int offset = answer.getArgument(1); + final int length = answer.getArgument(2); + normalRestOp.signRequest(normalOp, length); + normalOp.sendRequest(buffer, offset, length); + normalOp.processResponse(buffer, offset, length); + LOG.info("Actual outcome is {} \"{}\" \"{}\"; injecting failure", + normalOp.getStatusCode(), + normalOp.getStorageErrorCode(), + normalOp.getStorageErrorMessage()); + throw new SocketException("connection-reset"); + }).when(failingOperation).sendRequest(Mockito.nullable(byte[].class), + Mockito.nullable(int.class), Mockito.nullable(int.class)); + } + + /** + * This is the good outcome: resilient rename. + */ + @Test + public void testRenameRecoverySrcDestEtagSame() throws IOException { + AzureBlobFileSystem fs = getFileSystem(); + TracingContext testTracingContext = getTestTracingContext(fs, false); + + Assume.assumeTrue(fs.getAbfsStore().getIsNamespaceEnabled(testTracingContext)); + + AbfsClient mockClient = getMockAbfsClient(); + + String base = "/" + getMethodName(); + String path1 = base + "/dummyFile1"; + String path2 = base + "/dummyFile2"; + + touch(new Path(path1)); + + // 404 and retry, send sourceEtag as null + // source eTag matches -> rename should pass even when execute throws exception + final AbfsClientRenameResult result = + mockClient.renamePath(path1, path2, null, testTracingContext, null, false); + Assertions.assertThat(result.isRenameRecovered()) + .describedAs("rename result recovered flag of %s", result) + .isTrue(); + } + + /** + * execute a failing rename but have the file at the far end not match. + * This is done by explicitly passing in a made up etag for the source + * etag and creating a file at the far end. + * The first rename will actually fail with a path exists exception, + * but as that is swallowed, it's not a problem. + */ + @Test + public void testRenameRecoverySourceDestEtagDifferent() throws Exception { + AzureBlobFileSystem fs = getFileSystem(); + TracingContext testTracingContext = getTestTracingContext(fs, false); + + Assume.assumeTrue(fs.getAbfsStore().getIsNamespaceEnabled(testTracingContext)); + + AbfsClient spyClient = getMockAbfsClient(); + + String base = "/" + getMethodName(); + String path1 = base + "/dummyFile1"; + String path2 = base + "/dummyFile2"; + + touch(new Path(path2)); + + // source eTag does not match -> throw exception + expectErrorCode(SOURCE_PATH_NOT_FOUND, intercept(AbfsRestOperationException.class, () -> + spyClient.renamePath(path1, path2, null, testTracingContext, "source", false)) + ); + } + + /** + * Assert that an exception failed with a specific error code. + * @param code code + * @param e exception + * @throws AbfsRestOperationException if there is a mismatch + */ + private static void expectErrorCode(final AzureServiceErrorCode code, + final AbfsRestOperationException e) throws AbfsRestOperationException { + if (e.getErrorCode() != code) { + throw e; + } + } + + /** + * Directory rename failure is unrecoverable. + */ + @Test + public void testDirRenameRecoveryUnsupported() throws Exception { + AzureBlobFileSystem fs = getFileSystem(); + TracingContext testTracingContext = getTestTracingContext(fs, false); + + Assume.assumeTrue(fs.getAbfsStore().getIsNamespaceEnabled(testTracingContext)); + + AbfsClient spyClient = getMockAbfsClient(); + + String base = "/" + getMethodName(); + String path1 = base + "/dummyDir1"; + String path2 = base + "/dummyDir2"; + + fs.mkdirs(new Path(path1)); + + // source eTag does not match -> throw exception + expectErrorCode(SOURCE_PATH_NOT_FOUND, intercept(AbfsRestOperationException.class, () -> + spyClient.renamePath(path1, path2, null, testTracingContext, null, false))); + } + + /** + * Even with failures, having + */ + @Test + public void testExistingPathCorrectlyRejected() throws Exception { + AzureBlobFileSystem fs = getFileSystem(); + TracingContext testTracingContext = getTestTracingContext(fs, false); + + Assume.assumeTrue(fs.getAbfsStore().getIsNamespaceEnabled(testTracingContext)); + + AbfsClient spyClient = getMockAbfsClient(); + + String base = "/" + getMethodName(); + String path1 = base + "/dummyDir1"; + String path2 = base + "/dummyDir2"; + + + touch(new Path(path1)); + touch(new Path(path2)); + + // source eTag does not match -> throw exception + expectErrorCode(PATH_ALREADY_EXISTS, intercept(AbfsRestOperationException.class, () -> + spyClient.renamePath(path1, path2, null, testTracingContext, null, false))); + } + + /** + * Test the resilient commit code works through fault injection, including + * reporting recovery. + */ + @Test + public void testResilientCommitOperation() throws Throwable { + AzureBlobFileSystem fs = getFileSystem(); + TracingContext testTracingContext = getTestTracingContext(fs, false); + + final AzureBlobFileSystemStore store = fs.getAbfsStore(); + Assume.assumeTrue(store.getIsNamespaceEnabled(testTracingContext)); + + // patch in the mock abfs client to the filesystem, for the resilient + // commit API to pick up. + setAbfsClient(store, getMockAbfsClient()); + + String base = "/" + getMethodName(); + String path1 = base + "/dummyDir1"; + String path2 = base + "/dummyDir2"; + + + final Path source = new Path(path1); + touch(source); + final String sourceTag = ((EtagSource) fs.getFileStatus(source)).getEtag(); + + final ResilientCommitByRename commit = fs.createResilientCommitSupport(source); + final Pair outcome = + commit.commitSingleFileByRename(source, new Path(path2), sourceTag); + Assertions.assertThat(outcome.getKey()) + .describedAs("recovery flag") + .isTrue(); + } + /** + * Test the resilient commit code works through fault injection, including + * reporting recovery. + */ + @Test + public void testResilientCommitOperationTagMismatch() throws Throwable { + AzureBlobFileSystem fs = getFileSystem(); + TracingContext testTracingContext = getTestTracingContext(fs, false); + + final AzureBlobFileSystemStore store = fs.getAbfsStore(); + Assume.assumeTrue(store.getIsNamespaceEnabled(testTracingContext)); + + // patch in the mock abfs client to the filesystem, for the resilient + // commit API to pick up. + setAbfsClient(store, getMockAbfsClient()); + + String base = "/" + getMethodName(); + String path1 = base + "/dummyDir1"; + String path2 = base + "/dummyDir2"; + + + final Path source = new Path(path1); + touch(source); + final String sourceTag = ((EtagSource) fs.getFileStatus(source)).getEtag(); + + final ResilientCommitByRename commit = fs.createResilientCommitSupport(source); + intercept(FileNotFoundException.class, () -> + commit.commitSingleFileByRename(source, new Path(path2), "not the right tag")); + } + /** * Method to create an AbfsRestOperationException. * @param statusCode status code to be used.