From e672ae365bc10017e6776b50878f355ea8293c31 Mon Sep 17 00:00:00 2001 From: sreeb-msft Date: Fri, 17 Mar 2023 10:03:46 +0530 Subject: [PATCH 01/10] Rename retry recovery changes --- .../fs/azurebfs/services/AbfsClient.java | 28 ++++-- .../services/TestAbfsRenameRetryRecovery.java | 86 +++++++++++++++++++ 2 files changed, 107 insertions(+), 7 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index 25562660ae231..010ce304ca702 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -519,11 +519,19 @@ public AbfsClientRenameResult renamePath( final String destination, final String continuation, final TracingContext tracingContext, - final String sourceEtag, + String sourceEtag, boolean isMetadataIncompleteState) throws AzureBlobFileSystemException { final List requestHeaders = createDefaultHeaders(); + if (sourceEtag == null || sourceEtag.isEmpty()) { + final AbfsRestOperation srcStatusOp = getPathStatus(source, + false, tracingContext); + final AbfsHttpOperation result = srcStatusOp.getResult(); + + sourceEtag = extractEtagHeader(result); + } + String encodedRenameSource = urlEncode(FORWARD_SLASH + this.getFileSystem() + source); if (authType == AuthType.SAS) { final AbfsUriQueryBuilder srcQueryBuilder = new AbfsUriQueryBuilder(); @@ -540,12 +548,7 @@ public AbfsClientRenameResult renamePath( appendSASTokenToQuery(destination, SASTokenProvider.RENAME_DESTINATION_OPERATION, abfsUriQueryBuilder); final URL url = createRequestUrl(destination, abfsUriQueryBuilder.toString()); - final AbfsRestOperation op = new AbfsRestOperation( - AbfsRestOperationType.RenamePath, - this, - HTTP_METHOD_PUT, - url, - requestHeaders); + final AbfsRestOperation op = createRenameRestOperation(url, requestHeaders); try { incrementAbfsRenamePath(); op.execute(tracingContext); @@ -598,6 +601,17 @@ public AbfsClientRenameResult renamePath( } } + @VisibleForTesting + AbfsRestOperation createRenameRestOperation(URL url, List requestHeaders) { + AbfsRestOperation op = new AbfsRestOperation( + AbfsRestOperationType.RenamePath, + this, + HTTP_METHOD_PUT, + url, + requestHeaders); + return op; + } + private void incrementAbfsRenamePath() { abfsCounters.incrementCounter(RENAME_PATH_ATTEMPTS, 1); } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java index 65ea79b36bd0e..f3670a20883a1 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java @@ -18,8 +18,13 @@ package org.apache.hadoop.fs.azurebfs.services; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode; +import org.apache.hadoop.fs.azurebfs.utils.TracingContext; import org.assertj.core.api.Assertions; +import org.junit.Assert; import org.junit.Test; +import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,9 +33,17 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; +import javax.net.ssl.HttpsURLConnection; + +import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.List; + import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT; import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND; import static org.apache.hadoop.test.LambdaTestUtils.intercept; +import static org.mockito.ArgumentMatchers.nullable; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -123,6 +136,79 @@ public void testRenameFailuresDueToIncompleteMetadata() throws Exception { } + @Test + public void testSourceNotFoundRetrySuccess() throws IOException { + AzureBlobFileSystem fs = getFileSystem(); + + // specifying AbfsHttpOperation mock behavior + AbfsHttpOperation mockHttp404Op = Mockito.mock(AbfsHttpOperation.class); + + Mockito.doReturn(404).when(mockHttp404Op).getStatusCode(); + Mockito.doNothing().when(mockHttp404Op).processResponse(nullable(byte[].class), Mockito.any(int.class), Mockito.any(int.class)); + Mockito.doNothing().when(mockHttp404Op).setRequestProperty(nullable(String.class), nullable(String.class)); + Mockito.doNothing().when(mockHttp404Op).sendRequest(nullable(byte[].class), Mockito.any(int.class), Mockito.any(int.class)); + Mockito.doReturn("PUT").when(mockHttp404Op).getMethod(); + Mockito.doReturn("Source Path not found").when(mockHttp404Op).getStorageErrorMessage(); + Mockito.doReturn("SourcePathNotFound").when(mockHttp404Op).getStorageErrorCode(); + + + AbfsHttpOperation mockHttp500Op = Mockito.mock(AbfsHttpOperation.class); + Mockito.doReturn(500).when(mockHttp500Op).getStatusCode(); + Mockito.doThrow(IOException.class) + .when(mockHttp500Op).processResponse(nullable(byte[].class), Mockito.any(int.class), Mockito.any(int.class)); + Mockito.doNothing().when(mockHttp500Op).setRequestProperty(nullable(String.class), nullable(String.class)); + Mockito.doNothing().when(mockHttp500Op).sendRequest(nullable(byte[].class), Mockito.any(int.class), Mockito.any(int.class)); + Mockito.doReturn("PUT").when(mockHttp500Op).getMethod(); + + // creating mock HttpUrlConnection object + HttpURLConnection mockUrlConn = Mockito.mock(HttpsURLConnection.class); + + // tying all mocks together + Mockito.doReturn(mockUrlConn).when(mockHttp404Op).getConnection(); + Mockito.doReturn(mockUrlConn).when(mockHttp500Op).getConnection(); + + // adding mock objects to current AbfsClient + AbfsClient spyClient = Mockito.spy(fs.getAbfsStore().getClient()); + AbfsRestOperation mockRestOp = Mockito.spy(new AbfsRestOperation( + AbfsRestOperationType.RenamePath, + spyClient, + HTTP_METHOD_PUT, + null, + null) + ); + Mockito.doReturn(mockRestOp).when(spyClient).createRenameRestOperation(nullable(URL.class), nullable(List.class)); + + Mockito.doReturn(mockHttp500Op).doReturn(mockHttp404Op).when(mockRestOp).createHttpOperation(); + Mockito.doReturn(mockHttp500Op).doReturn(mockHttp404Op).when(mockRestOp).getResult(); + + Mockito.doReturn(true).when(mockRestOp).isARetriedRequest(); + Mockito.doReturn(true).when(mockRestOp).hasResult(); + + SharedKeyCredentials mockSharedKeyCreds = mock(SharedKeyCredentials.class); + Mockito.doNothing().when(mockSharedKeyCreds).signRequest(Mockito.any(HttpURLConnection.class), Mockito.any(long.class)); + Mockito.doCallRealMethod().doReturn(mockSharedKeyCreds).when(spyClient).getSharedKeyCredentials(); + + String path1 = "/dummyFile1"; + String path2 = "/dummyFile2"; + + fs.create(new Path(path1)); + fs.create(new Path(path2)); + + TracingContext testTracingContext = getTestTracingContext(fs, false); + + // 404 and retry, send sourceEtag as null + // source eTag matches -> rename should pass even when execute throws exception + spyClient.renamePath(path1, path1, null, testTracingContext, null, false); + + // source eTag does not match -> throw exception + try { + spyClient.renamePath(path1, path2,null, testTracingContext, null, false); + } catch (AbfsRestOperationException e) { + Assert.assertEquals(200, e.getErrorCode()); + } + + } + /** * Method to create an AbfsRestOperationException. * @param statusCode status code to be used. From f53b43cbb254be7d0a145daa0e87aa31fb7a1940 Mon Sep 17 00:00:00 2001 From: sreeb-msft Date: Fri, 17 Mar 2023 11:15:56 +0530 Subject: [PATCH 02/10] Changes to mock behavior --- .../azurebfs/services/TestAbfsRenameRetryRecovery.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java index f3670a20883a1..d49493fa67d2b 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java @@ -186,7 +186,7 @@ public void testSourceNotFoundRetrySuccess() throws IOException { SharedKeyCredentials mockSharedKeyCreds = mock(SharedKeyCredentials.class); Mockito.doNothing().when(mockSharedKeyCreds).signRequest(Mockito.any(HttpURLConnection.class), Mockito.any(long.class)); - Mockito.doCallRealMethod().doReturn(mockSharedKeyCreds).when(spyClient).getSharedKeyCredentials(); + Mockito.doCallRealMethod().doReturn(mockSharedKeyCreds).doReturn(mockSharedKeyCreds).doCallRealMethod().when(spyClient).getSharedKeyCredentials(); String path1 = "/dummyFile1"; String path2 = "/dummyFile2"; @@ -197,15 +197,17 @@ public void testSourceNotFoundRetrySuccess() throws IOException { TracingContext testTracingContext = getTestTracingContext(fs, false); // 404 and retry, send sourceEtag as null - // source eTag matches -> rename should pass even when execute throws exception - spyClient.renamePath(path1, path1, null, testTracingContext, null, false); // source eTag does not match -> throw exception try { spyClient.renamePath(path1, path2,null, testTracingContext, null, false); } catch (AbfsRestOperationException e) { - Assert.assertEquals(200, e.getErrorCode()); + Assert.assertEquals(404, e.getErrorCode()); } + // source eTag matches -> rename should pass even when execute throws exception + spyClient.renamePath(path1, path1, null, testTracingContext, null, false); + + } From 1abc380679f88f1bcd30e3dc39e28a8caa10109e Mon Sep 17 00:00:00 2001 From: sreeb-msft Date: Fri, 17 Mar 2023 11:21:48 +0530 Subject: [PATCH 03/10] Added comments --- .../fs/azurebfs/services/TestAbfsRenameRetryRecovery.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java index d49493fa67d2b..99d5dff1f6c36 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java @@ -141,8 +141,9 @@ public void testSourceNotFoundRetrySuccess() throws IOException { AzureBlobFileSystem fs = getFileSystem(); // specifying AbfsHttpOperation mock behavior - AbfsHttpOperation mockHttp404Op = Mockito.mock(AbfsHttpOperation.class); + // mock object representing the 404 path not found result + AbfsHttpOperation mockHttp404Op = Mockito.mock(AbfsHttpOperation.class); Mockito.doReturn(404).when(mockHttp404Op).getStatusCode(); Mockito.doNothing().when(mockHttp404Op).processResponse(nullable(byte[].class), Mockito.any(int.class), Mockito.any(int.class)); Mockito.doNothing().when(mockHttp404Op).setRequestProperty(nullable(String.class), nullable(String.class)); @@ -152,6 +153,7 @@ public void testSourceNotFoundRetrySuccess() throws IOException { Mockito.doReturn("SourcePathNotFound").when(mockHttp404Op).getStorageErrorCode(); + // // mock object representing the 500 timeout result for first try of rename AbfsHttpOperation mockHttp500Op = Mockito.mock(AbfsHttpOperation.class); Mockito.doReturn(500).when(mockHttp500Op).getStatusCode(); Mockito.doThrow(IOException.class) @@ -169,6 +171,8 @@ public void testSourceNotFoundRetrySuccess() throws IOException { // adding mock objects to current AbfsClient AbfsClient spyClient = Mockito.spy(fs.getAbfsStore().getClient()); + // Rest Operation is spied as it needs to have spyclient instance as a param to the constructor + // directly returning a mock for this would make the client instance null AbfsRestOperation mockRestOp = Mockito.spy(new AbfsRestOperation( AbfsRestOperationType.RenamePath, spyClient, @@ -186,6 +190,8 @@ public void testSourceNotFoundRetrySuccess() throws IOException { SharedKeyCredentials mockSharedKeyCreds = mock(SharedKeyCredentials.class); Mockito.doNothing().when(mockSharedKeyCreds).signRequest(Mockito.any(HttpURLConnection.class), Mockito.any(long.class)); + // real method calls made once at start and once at end + // for the two getPathStatus calls that actually have to be made Mockito.doCallRealMethod().doReturn(mockSharedKeyCreds).doReturn(mockSharedKeyCreds).doCallRealMethod().when(spyClient).getSharedKeyCredentials(); String path1 = "/dummyFile1"; From e8ce29ffce56e1dfb81b7039333d9672ab06a0e8 Mon Sep 17 00:00:00 2001 From: sreeb-msft Date: Fri, 17 Mar 2023 12:37:55 +0530 Subject: [PATCH 04/10] Changes to test methods --- .../services/TestAbfsRenameRetryRecovery.java | 47 +++++++++++++++---- 1 file changed, 37 insertions(+), 10 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java index 99d5dff1f6c36..4738d9be3dc1b 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java @@ -23,6 +23,7 @@ import org.apache.hadoop.fs.azurebfs.utils.TracingContext; import org.assertj.core.api.Assertions; import org.junit.Assert; +import org.junit.Assume; import org.junit.Test; import org.mockito.Mockito; import org.slf4j.Logger; @@ -42,6 +43,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT; import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND; +import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND; import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.mockito.ArgumentMatchers.nullable; import static org.mockito.Mockito.doReturn; @@ -136,8 +138,7 @@ public void testRenameFailuresDueToIncompleteMetadata() throws Exception { } - @Test - public void testSourceNotFoundRetrySuccess() throws IOException { + AbfsClient getMockAbfsClient() throws IOException { AzureBlobFileSystem fs = getFileSystem(); // specifying AbfsHttpOperation mock behavior @@ -161,6 +162,7 @@ public void testSourceNotFoundRetrySuccess() throws IOException { Mockito.doNothing().when(mockHttp500Op).setRequestProperty(nullable(String.class), nullable(String.class)); Mockito.doNothing().when(mockHttp500Op).sendRequest(nullable(byte[].class), Mockito.any(int.class), Mockito.any(int.class)); Mockito.doReturn("PUT").when(mockHttp500Op).getMethod(); + Mockito.doReturn("ClientTimeoutError").when(mockHttp500Op).getStorageErrorCode(); // creating mock HttpUrlConnection object HttpURLConnection mockUrlConn = Mockito.mock(HttpsURLConnection.class); @@ -185,36 +187,61 @@ public void testSourceNotFoundRetrySuccess() throws IOException { Mockito.doReturn(mockHttp500Op).doReturn(mockHttp404Op).when(mockRestOp).createHttpOperation(); Mockito.doReturn(mockHttp500Op).doReturn(mockHttp404Op).when(mockRestOp).getResult(); - Mockito.doReturn(true).when(mockRestOp).isARetriedRequest(); Mockito.doReturn(true).when(mockRestOp).hasResult(); + SharedKeyCredentials mockSharedKeyCreds = mock(SharedKeyCredentials.class); Mockito.doNothing().when(mockSharedKeyCreds).signRequest(Mockito.any(HttpURLConnection.class), Mockito.any(long.class)); // real method calls made once at start and once at end // for the two getPathStatus calls that actually have to be made Mockito.doCallRealMethod().doReturn(mockSharedKeyCreds).doReturn(mockSharedKeyCreds).doCallRealMethod().when(spyClient).getSharedKeyCredentials(); + return spyClient; + + } + + @Test + public void testRenameRecoverySrcDestEtagSame() throws IOException { + AzureBlobFileSystem fs = getFileSystem(); + TracingContext testTracingContext = getTestTracingContext(fs, false); + + Assume.assumeTrue(fs.getAbfsStore().getIsNamespaceEnabled(testTracingContext)); + + AbfsClient mockClient = getMockAbfsClient(); + + String path1 = "/dummyFile1"; String path2 = "/dummyFile2"; fs.create(new Path(path1)); fs.create(new Path(path2)); + // 404 and retry, send sourceEtag as null + // source eTag matches -> rename should pass even when execute throws exception + mockClient.renamePath(path1, path1, null, testTracingContext, null, false); + } + + @Test + public void testRenameRecoverySrcDestEtagDifferent() throws IOException { + AzureBlobFileSystem fs = getFileSystem(); TracingContext testTracingContext = getTestTracingContext(fs, false); - // 404 and retry, send sourceEtag as null + Assume.assumeTrue(fs.getAbfsStore().getIsNamespaceEnabled(testTracingContext)); + + AbfsClient spyClient = getMockAbfsClient(); + + String path1 = "/dummyFile1"; + String path2 = "/dummyFile2"; + + fs.create(new Path(path1)); + fs.create(new Path(path2)); // source eTag does not match -> throw exception try { spyClient.renamePath(path1, path2,null, testTracingContext, null, false); } catch (AbfsRestOperationException e) { - Assert.assertEquals(404, e.getErrorCode()); + Assert.assertEquals(SOURCE_PATH_NOT_FOUND, e.getErrorCode()); } - // source eTag matches -> rename should pass even when execute throws exception - spyClient.renamePath(path1, path1, null, testTracingContext, null, false); - - - } /** From 98b3de7c5b68faf01cc06e1a6d252ee651f7407b Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Wed, 15 Mar 2023 13:55:20 +0000 Subject: [PATCH 05/10] HADOOP-18425. ABFS rename resilience through etags If "fs.azure.enable.rename.resilience" is true, then do a HEAD of the source file before the rename, which can then be used to recover from the failure, as the manifest committer does (HADOOP-18163). Change-Id: Ia417f1501f7274662eb9ff919c6378fb913b476b HADOOP-18425. ABFS rename resilience through etags only get the etag on HNS stores Change-Id: I9faffa78294e1782f0b2db3d1c997ec3fe53637c --- .../hadoop/fs/azurebfs/AbfsConfiguration.java | 7 ++++ .../fs/azurebfs/AzureBlobFileSystem.java | 33 +++++++++++++++++-- .../azurebfs/constants/ConfigurationKeys.java | 3 ++ .../constants/FileSystemConfigurations.java | 1 + .../fs/azurebfs/services/AbfsClient.java | 8 ++--- 5 files changed, 44 insertions(+), 8 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index 80f803d80dab0..1cc7e5ec7e19e 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -328,6 +328,10 @@ public class AbfsConfiguration{ FS_AZURE_ENABLE_ABFS_LIST_ITERATOR, DefaultValue = DEFAULT_ENABLE_ABFS_LIST_ITERATOR) private boolean enableAbfsListIterator; + @BooleanConfigurationValidatorAnnotation(ConfigurationKey = + FS_AZURE_ABFS_RENAME_RESILIENCE, DefaultValue = DEFAULT_ENABLE_ABFS_RENAME_RESILIENCE) + private boolean renameResilience; + public AbfsConfiguration(final Configuration rawConfig, String accountName) throws IllegalAccessException, InvalidConfigurationValueException, IOException { this.rawConfig = ProviderUtils.excludeIncompatibleCredentialProviders( @@ -1130,4 +1134,7 @@ public void setEnableAbfsListIterator(boolean enableAbfsListIterator) { this.enableAbfsListIterator = enableAbfsListIterator; } + public boolean getRenameResilience() { + return renameResilience; + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index 5534b5fb44a51..9facc44cd3b31 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -46,6 +46,9 @@ import javax.annotation.Nullable; import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.fs.EtagSource; +import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation; +import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation; import org.apache.hadoop.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -154,6 +157,11 @@ public class AzureBlobFileSystem extends FileSystem /** Rate limiting for operations which use it to throttle their IO. */ private RateLimiting rateLimiting; + /** + * Enable resilient rename. + */ + private boolean renameResilience; + @Override public void initialize(URI uri, Configuration configuration) throws IOException { @@ -226,6 +234,8 @@ public void initialize(URI uri, Configuration configuration) } rateLimiting = RateLimitingFactory.create(abfsConfiguration.getRateLimit()); + + renameResilience = abfsConfiguration.getRenameResilience(); LOG.debug("Initializing AzureBlobFileSystem for {} complete", uri); } @@ -442,10 +452,13 @@ public boolean rename(final Path src, final Path dst) throws IOException { } // Non-HNS account need to check dst status on driver side. - if (!abfsStore.getIsNamespaceEnabled(tracingContext) && dstFileStatus == null) { + final boolean isNamespaceEnabled = abfsStore.getIsNamespaceEnabled(tracingContext); + if (!isNamespaceEnabled && dstFileStatus == null) { dstFileStatus = tryGetFileStatus(qualifiedDstPath, tracingContext); } + FileStatus sourceFileStatus = null; + try { String sourceFileName = src.getName(); Path adjustedDst = dst; @@ -459,10 +472,24 @@ public boolean rename(final Path src, final Path dst) throws IOException { qualifiedDstPath = makeQualified(adjustedDst); - abfsStore.rename(qualifiedSrcPath, qualifiedDstPath, tracingContext, null); + String etag = null; + if (renameResilience && isNamespaceEnabled) { + // for resilient rename on an HNS store, get the etag before + // attempting the rename, and pass it down + sourceFileStatus = abfsStore.getFileStatus(qualifiedSrcPath, tracingContext); + etag = ((EtagSource) sourceFileStatus).getEtag(); + } + boolean recovered = abfsStore.rename(qualifiedSrcPath, qualifiedDstPath, tracingContext, + etag); + if (recovered) { + LOG.info("Recovered from rename failure of {} to {}", + qualifiedSrcPath, qualifiedDstPath); + } return true; } catch (AzureBlobFileSystemException ex) { - LOG.debug("Rename operation failed. ", ex); + LOG.debug("Rename {} to {} failed. source {} dest {}", + qualifiedSrcPath, qualifiedDstPath, + sourceFileStatus, dstFileStatus, ex); checkException( src, ex, diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index a59f76b6d0fe0..9b1ff0d9ef1fe 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -233,6 +233,9 @@ public final class ConfigurationKeys { /** Key for rate limit capacity, as used by IO operations which try to throttle themselves. */ public static final String FS_AZURE_ABFS_IO_RATE_LIMIT = "fs.azure.io.rate.limit"; + /** Add extra resilience to rename failures, at the expense of performance. */ + public static final String FS_AZURE_ABFS_RENAME_RESILIENCE = "fs.azure.enable.rename.resilience"; + public static String accountProperty(String property, String account) { return property + "." + account; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index 9994d9f5207f3..fcbeb1ab5387e 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -118,6 +118,7 @@ public final class FileSystemConfigurations { public static final int STREAM_ID_LEN = 12; public static final boolean DEFAULT_ENABLE_ABFS_LIST_ITERATOR = true; + public static final boolean DEFAULT_ENABLE_ABFS_RENAME_RESILIENCE = true; /** * Limit of queued block upload operations before writes diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index 010ce304ca702..d5901ab9752e2 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -563,6 +563,7 @@ public AbfsClientRenameResult renamePath( if (!op.hasResult()) { throw e; } + LOG.debug("Rename of {} to {} failed, attempting recovery", source, destination, e); // ref: HADOOP-18242. Rename failure occurring due to a rare case of // tracking metadata being in incomplete state. @@ -577,18 +578,15 @@ public AbfsClientRenameResult renamePath( // then we can retry the rename operation. AbfsRestOperation sourceStatusOp = getPathStatus(source, false, tracingContext); - isMetadataIncompleteState = true; // Extract the sourceEtag, using the status Op, and set it // for future rename recovery. AbfsHttpOperation sourceStatusResult = sourceStatusOp.getResult(); String sourceEtagAfterFailure = extractEtagHeader(sourceStatusResult); renamePath(source, destination, continuation, tracingContext, - sourceEtagAfterFailure, isMetadataIncompleteState); + sourceEtagAfterFailure, true); } // if we get out of the condition without a successful rename, then // it isn't metadata incomplete state issue. - isMetadataIncompleteState = false; - boolean etagCheckSucceeded = renameIdempotencyCheckOp( source, sourceEtag, op, destination, tracingContext); @@ -597,7 +595,7 @@ public AbfsClientRenameResult renamePath( // throw back the exception throw e; } - return new AbfsClientRenameResult(op, true, isMetadataIncompleteState); + return new AbfsClientRenameResult(op, true, false); } } From 70a8d0fb3956b2a0e2c343373475ef7a0e75ab08 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Sun, 19 Mar 2023 19:30:32 +0000 Subject: [PATCH 06/10] HADOOP-18425. ABFS rename resilience through etags 1. move config checks of rename resilience flag into AbfsClient 2. only getPathStatus on rename if enabled 3. resilience handling logs when unable to recover from a dir 4. and when it successfully recovers a file. Change-Id: I58b5f11e4c9b7c1a1d809d2db47a3cafe51f2060 --- .../apache/hadoop/fs/FilterFileSystem.java | 2 + .../fs/azurebfs/AzureBlobFileSystem.java | 30 +-------- .../fs/azurebfs/services/AbfsClient.java | 64 +++++++++++++++---- 3 files changed, 56 insertions(+), 40 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java index cdbe51e330701..0fe37dca5a918 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java @@ -76,6 +76,8 @@ public FilterFileSystem(FileSystem fs) { this.statistics = fs.statistics; } + + /** * Get the raw file system * @return FileSystem being filtered diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index 9facc44cd3b31..ad66d384d1cef 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -157,11 +157,6 @@ public class AzureBlobFileSystem extends FileSystem /** Rate limiting for operations which use it to throttle their IO. */ private RateLimiting rateLimiting; - /** - * Enable resilient rename. - */ - private boolean renameResilience; - @Override public void initialize(URI uri, Configuration configuration) throws IOException { @@ -234,8 +229,6 @@ public void initialize(URI uri, Configuration configuration) } rateLimiting = RateLimitingFactory.create(abfsConfiguration.getRateLimit()); - - renameResilience = abfsConfiguration.getRenameResilience(); LOG.debug("Initializing AzureBlobFileSystem for {} complete", uri); } @@ -452,13 +445,10 @@ public boolean rename(final Path src, final Path dst) throws IOException { } // Non-HNS account need to check dst status on driver side. - final boolean isNamespaceEnabled = abfsStore.getIsNamespaceEnabled(tracingContext); - if (!isNamespaceEnabled && dstFileStatus == null) { + if (!abfsStore.getIsNamespaceEnabled(tracingContext) && dstFileStatus == null) { dstFileStatus = tryGetFileStatus(qualifiedDstPath, tracingContext); } - FileStatus sourceFileStatus = null; - try { String sourceFileName = src.getName(); Path adjustedDst = dst; @@ -472,24 +462,10 @@ public boolean rename(final Path src, final Path dst) throws IOException { qualifiedDstPath = makeQualified(adjustedDst); - String etag = null; - if (renameResilience && isNamespaceEnabled) { - // for resilient rename on an HNS store, get the etag before - // attempting the rename, and pass it down - sourceFileStatus = abfsStore.getFileStatus(qualifiedSrcPath, tracingContext); - etag = ((EtagSource) sourceFileStatus).getEtag(); - } - boolean recovered = abfsStore.rename(qualifiedSrcPath, qualifiedDstPath, tracingContext, - etag); - if (recovered) { - LOG.info("Recovered from rename failure of {} to {}", - qualifiedSrcPath, qualifiedDstPath); - } + abfsStore.rename(qualifiedSrcPath, qualifiedDstPath, tracingContext, null); return true; } catch (AzureBlobFileSystemException ex) { - LOG.debug("Rename {} to {} failed. source {} dest {}", - qualifiedSrcPath, qualifiedDstPath, - sourceFileStatus, dstFileStatus, ex); + LOG.debug("Rename operation failed. ", ex); checkException( src, ex, diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index d5901ab9752e2..061a10ade2593 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -68,6 +68,7 @@ import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory; import org.apache.hadoop.util.concurrent.HadoopExecutors; +import static org.apache.commons.lang3.StringUtils.isEmpty; import static org.apache.commons.lang3.StringUtils.isNotEmpty; import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.RENAME_PATH_ATTEMPTS; import static org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.extractEtagHeader; @@ -105,6 +106,11 @@ public class AbfsClient implements Closeable { private final ListeningScheduledExecutorService executorService; + /** + * Enable resilient rename. + */ + private boolean renameResilience; + /** logging the rename failure if metadata is in an incomplete state. */ private static final LogExactlyOnce ABFS_METADATA_INCOMPLETE_RENAME_FAILURE = new LogExactlyOnce(LOG); @@ -157,6 +163,9 @@ private AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCreden new ThreadFactoryBuilder().setNameFormat("AbfsClient Lease Ops").setDaemon(true).build(); this.executorService = MoreExecutors.listeningDecorator( HadoopExecutors.newScheduledThreadPool(this.abfsConfiguration.getNumLeaseThreads(), tf)); + // rename resilience + renameResilience = abfsConfiguration.getRenameResilience(); + LOG.debug("Rename resilience is {}",renameResilience); } public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials, @@ -524,12 +533,19 @@ public AbfsClientRenameResult renamePath( throws AzureBlobFileSystemException { final List requestHeaders = createDefaultHeaders(); - if (sourceEtag == null || sourceEtag.isEmpty()) { + // etag passed in, so source is a file + final boolean hasEtag = isEmpty(sourceEtag); + boolean isDir = !hasEtag; + if (!hasEtag && renameResilience) { + // no etag was passed in and rename resilience is enabled, so + // get the value final AbfsRestOperation srcStatusOp = getPathStatus(source, false, tracingContext); final AbfsHttpOperation result = srcStatusOp.getResult(); sourceEtag = extractEtagHeader(result); + // and update the directory status. + isDir = isEmpty(sourceEtag); } String encodedRenameSource = urlEncode(FORWARD_SLASH + this.getFileSystem() + source); @@ -578,24 +594,27 @@ public AbfsClientRenameResult renamePath( // then we can retry the rename operation. AbfsRestOperation sourceStatusOp = getPathStatus(source, false, tracingContext); + isMetadataIncompleteState = true; // Extract the sourceEtag, using the status Op, and set it // for future rename recovery. AbfsHttpOperation sourceStatusResult = sourceStatusOp.getResult(); String sourceEtagAfterFailure = extractEtagHeader(sourceStatusResult); renamePath(source, destination, continuation, tracingContext, - sourceEtagAfterFailure, true); + sourceEtagAfterFailure, isMetadataIncompleteState); } // if we get out of the condition without a successful rename, then // it isn't metadata incomplete state issue. + isMetadataIncompleteState = false; + boolean etagCheckSucceeded = renameIdempotencyCheckOp( source, - sourceEtag, op, destination, tracingContext); + sourceEtag, op, destination, tracingContext, isDir); if (!etagCheckSucceeded) { // idempotency did not return different result // throw back the exception throw e; } - return new AbfsClientRenameResult(op, true, false); + return new AbfsClientRenameResult(op, true, isMetadataIncompleteState); } } @@ -625,10 +644,11 @@ private void incrementAbfsRenamePath() { * Exceptions raised in the probe of the destination are swallowed, * so that they do not interfere with the original rename failures. * @param source source path + * @param sourceEtag etag of source file. may be null or empty * @param op Rename request REST operation response with non-null HTTP response * @param destination rename destination path - * @param sourceEtag etag of source file. may be null or empty * @param tracingContext Tracks identifiers for request header + * @param isDir is the source a file or directory * @return true if the file was successfully copied */ public boolean renameIdempotencyCheckOp( @@ -636,17 +656,31 @@ public boolean renameIdempotencyCheckOp( final String sourceEtag, final AbfsRestOperation op, final String destination, - TracingContext tracingContext) { + TracingContext tracingContext, + final boolean isDir) { Preconditions.checkArgument(op.hasResult(), "Operations has null HTTP response"); - if ((op.isARetriedRequest()) - && (op.getResult().getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) - && isNotEmpty(sourceEtag)) { - + if (!(op.isARetriedRequest()) + && (op.getResult().getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND)) { + // not an error + return false; + } + LOG.debug("Source not found on retry of rename({}, {}) isDir {} etag {}", + source, destination, isDir, sourceEtag); + if (isDir) { + // directory recovery is not supported. + // log and fail. + LOG.info("rename directory {} to {} failed; unable to recover", + source, destination); + return false; + } + if (isNotEmpty(sourceEtag)) { // Server has returned HTTP 404, which means rename source no longer // exists. Check on destination status and if its etag matches // that of the source, consider it to be a success. - LOG.debug("rename {} to {} failed, checking etag of destination", + // the source tag was either passed in from a manifest commit or + // retrieved when rename recovery is enabled. + LOG.info("rename {} to {} failed, checking etag of destination", source, destination); try { @@ -654,10 +688,14 @@ && isNotEmpty(sourceEtag)) { false, tracingContext); final AbfsHttpOperation result = destStatusOp.getResult(); - return result.getStatusCode() == HttpURLConnection.HTTP_OK + final boolean recovered = result.getStatusCode() == HttpURLConnection.HTTP_OK && sourceEtag.equals(extractEtagHeader(result)); - } catch (AzureBlobFileSystemException ignored) { + LOG.info("File rename has taken place: recovery completed"); + return recovered; + } catch (AzureBlobFileSystemException ex) { // GetFileStatus on the destination failed, the rename did not take place + // or some other failure. log and swallow. + LOG.debug("Failed to get status of path {}", destination, ex); } } return false; From 6b2ad465d79148148238fb4c7dbe847a9e338e19 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Mon, 20 Mar 2023 13:32:24 +0000 Subject: [PATCH 07/10] HADOOP-18425. ABFS rename resilience: review feedback + new itest, ITestAbfsContractRenameWithoutResilience this disables resilience and so verifies the normal codepath is still good. Change-Id: Ib2663c70afb112c9430043e94d75e9ddf7b3c313 --- .../apache/hadoop/fs/FilterFileSystem.java | 2 - .../fs/azurebfs/AzureBlobFileSystem.java | 3 -- .../fs/azurebfs/services/AbfsClient.java | 22 +++++----- ...stAbfsContractRenameWithoutResilience.java | 42 +++++++++++++++++++ .../services/TestAbfsRenameRetryRecovery.java | 19 +++++---- 5 files changed, 63 insertions(+), 25 deletions(-) create mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsContractRenameWithoutResilience.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java index 0fe37dca5a918..cdbe51e330701 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java @@ -76,8 +76,6 @@ public FilterFileSystem(FileSystem fs) { this.statistics = fs.statistics; } - - /** * Get the raw file system * @return FileSystem being filtered diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index ad66d384d1cef..5534b5fb44a51 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -46,9 +46,6 @@ import javax.annotation.Nullable; import org.apache.hadoop.classification.VisibleForTesting; -import org.apache.hadoop.fs.EtagSource; -import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation; -import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation; import org.apache.hadoop.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index 061a10ade2593..7477ac4ae247c 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -109,7 +109,7 @@ public class AbfsClient implements Closeable { /** * Enable resilient rename. */ - private boolean renameResilience; + private final boolean renameResilience; /** logging the rename failure if metadata is in an incomplete state. */ private static final LogExactlyOnce ABFS_METADATA_INCOMPLETE_RENAME_FAILURE = @@ -534,7 +534,7 @@ public AbfsClientRenameResult renamePath( final List requestHeaders = createDefaultHeaders(); // etag passed in, so source is a file - final boolean hasEtag = isEmpty(sourceEtag); + final boolean hasEtag = !isEmpty(sourceEtag); boolean isDir = !hasEtag; if (!hasEtag && renameResilience) { // no etag was passed in and rename resilience is enabled, so @@ -660,9 +660,11 @@ public boolean renameIdempotencyCheckOp( final boolean isDir) { Preconditions.checkArgument(op.hasResult(), "Operations has null HTTP response"); - if (!(op.isARetriedRequest()) - && (op.getResult().getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND)) { - // not an error + if (!(op.isARetriedRequest() + && (op.getResult().getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND))) { + // this failed on the first attempt (no not retry related) + // *or* it was any error other than 404 + // do not attempt to recover from this failure. return false; } LOG.debug("Source not found on retry of rename({}, {}) isDir {} etag {}", @@ -675,11 +677,8 @@ public boolean renameIdempotencyCheckOp( return false; } if (isNotEmpty(sourceEtag)) { - // Server has returned HTTP 404, which means rename source no longer - // exists. Check on destination status and if its etag matches - // that of the source, consider it to be a success. - // the source tag was either passed in from a manifest commit or - // retrieved when rename recovery is enabled. + // Server has returned HTTP 404, we have an etag, so see + // if the rename has actually taken place, LOG.info("rename {} to {} failed, checking etag of destination", source, destination); @@ -690,7 +689,8 @@ public boolean renameIdempotencyCheckOp( final boolean recovered = result.getStatusCode() == HttpURLConnection.HTTP_OK && sourceEtag.equals(extractEtagHeader(result)); - LOG.info("File rename has taken place: recovery completed"); + LOG.info("File rename has taken place: recovery {}", + recovered ? "succeeded" : "failed"); return recovered; } catch (AzureBlobFileSystemException ex) { // GetFileStatus on the destination failed, the rename did not take place diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsContractRenameWithoutResilience.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsContractRenameWithoutResilience.java new file mode 100644 index 0000000000000..600fa1a952c63 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsContractRenameWithoutResilience.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.contract; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys; + +/** + * Contract test for rename operation with rename resilience disabled. + * This is critical to ensure that adding resilience does not cause + * any regressions when disabled. + */ +public class ITestAbfsContractRenameWithoutResilience + extends ITestAbfsFileSystemContractRename { + + public ITestAbfsContractRenameWithoutResilience() throws Exception { + } + + @Override + protected Configuration createConfiguration() { + final Configuration conf = super.createConfiguration(); + conf.setBoolean(ConfigurationKeys.FS_AZURE_ABFS_RENAME_RESILIENCE, false); + return conf; + } + +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java index 4738d9be3dc1b..90748f8225de6 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java @@ -33,6 +33,7 @@ import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; +import org.apache.hadoop.fs.contract.ContractTestUtils; import javax.net.ssl.HttpsURLConnection; @@ -213,8 +214,8 @@ public void testRenameRecoverySrcDestEtagSame() throws IOException { String path1 = "/dummyFile1"; String path2 = "/dummyFile2"; - fs.create(new Path(path1)); - fs.create(new Path(path2)); + touch(new Path(path1)); + touch(new Path(path2)); // 404 and retry, send sourceEtag as null // source eTag matches -> rename should pass even when execute throws exception @@ -222,7 +223,7 @@ public void testRenameRecoverySrcDestEtagSame() throws IOException { } @Test - public void testRenameRecoverySrcDestEtagDifferent() throws IOException { + public void testRenameRecoverySrcDestEtagDifferent() throws Exception { AzureBlobFileSystem fs = getFileSystem(); TracingContext testTracingContext = getTestTracingContext(fs, false); @@ -233,14 +234,14 @@ public void testRenameRecoverySrcDestEtagDifferent() throws IOException { String path1 = "/dummyFile1"; String path2 = "/dummyFile2"; - fs.create(new Path(path1)); - fs.create(new Path(path2)); + touch(new Path(path1)); + touch(new Path(path2)); // source eTag does not match -> throw exception - try { - spyClient.renamePath(path1, path2,null, testTracingContext, null, false); - } catch (AbfsRestOperationException e) { - Assert.assertEquals(SOURCE_PATH_NOT_FOUND, e.getErrorCode()); + AbfsRestOperationException e = intercept(AbfsRestOperationException.class, () -> + spyClient.renamePath(path1, path2, null, testTracingContext, null, false)); + if (e.getErrorCode() != SOURCE_PATH_NOT_FOUND) { + throw e; } } From c747946d4ad11c6dd44b4185ee25524f3229ab86 Mon Sep 17 00:00:00 2001 From: Pranav Saxena Date: Mon, 20 Mar 2023 00:40:29 -0700 Subject: [PATCH 08/10] suggestions from pranav Change-Id: I1db3878ee12ea082e00438781e1ae86af9850ff7 --- .../services/TestAbfsRenameRetryRecovery.java | 136 ++++++++++++------ 1 file changed, 92 insertions(+), 44 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java index 90748f8225de6..7ce4c60a1db5b 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java @@ -19,6 +19,7 @@ package org.apache.hadoop.fs.azurebfs.services; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; import org.assertj.core.api.Assertions; @@ -39,6 +40,7 @@ import java.io.IOException; import java.net.HttpURLConnection; +import java.net.SocketException; import java.net.URL; import java.util.List; @@ -145,62 +147,110 @@ AbfsClient getMockAbfsClient() throws IOException { // specifying AbfsHttpOperation mock behavior // mock object representing the 404 path not found result - AbfsHttpOperation mockHttp404Op = Mockito.mock(AbfsHttpOperation.class); - Mockito.doReturn(404).when(mockHttp404Op).getStatusCode(); - Mockito.doNothing().when(mockHttp404Op).processResponse(nullable(byte[].class), Mockito.any(int.class), Mockito.any(int.class)); - Mockito.doNothing().when(mockHttp404Op).setRequestProperty(nullable(String.class), nullable(String.class)); - Mockito.doNothing().when(mockHttp404Op).sendRequest(nullable(byte[].class), Mockito.any(int.class), Mockito.any(int.class)); - Mockito.doReturn("PUT").when(mockHttp404Op).getMethod(); - Mockito.doReturn("Source Path not found").when(mockHttp404Op).getStorageErrorMessage(); - Mockito.doReturn("SourcePathNotFound").when(mockHttp404Op).getStorageErrorCode(); - - - // // mock object representing the 500 timeout result for first try of rename - AbfsHttpOperation mockHttp500Op = Mockito.mock(AbfsHttpOperation.class); - Mockito.doReturn(500).when(mockHttp500Op).getStatusCode(); - Mockito.doThrow(IOException.class) - .when(mockHttp500Op).processResponse(nullable(byte[].class), Mockito.any(int.class), Mockito.any(int.class)); - Mockito.doNothing().when(mockHttp500Op).setRequestProperty(nullable(String.class), nullable(String.class)); - Mockito.doNothing().when(mockHttp500Op).sendRequest(nullable(byte[].class), Mockito.any(int.class), Mockito.any(int.class)); - Mockito.doReturn("PUT").when(mockHttp500Op).getMethod(); - Mockito.doReturn("ClientTimeoutError").when(mockHttp500Op).getStorageErrorCode(); - - // creating mock HttpUrlConnection object - HttpURLConnection mockUrlConn = Mockito.mock(HttpsURLConnection.class); - - // tying all mocks together - Mockito.doReturn(mockUrlConn).when(mockHttp404Op).getConnection(); - Mockito.doReturn(mockUrlConn).when(mockHttp500Op).getConnection(); +// AbfsHttpOperation mockHttp404Op = Mockito.mock(AbfsHttpOperation.class); +// Mockito.doReturn(404).when(mockHttp404Op).getStatusCode(); +// Mockito.doNothing().when(mockHttp404Op).processResponse(nullable(byte[].class), Mockito.any(int.class), Mockito.any(int.class)); +// Mockito.doNothing().when(mockHttp404Op).setRequestProperty(nullable(String.class), nullable(String.class)); +// Mockito.doNothing().when(mockHttp404Op).sendRequest(nullable(byte[].class), Mockito.any(int.class), Mockito.any(int.class)); +// Mockito.doReturn("PUT").when(mockHttp404Op).getMethod(); +// Mockito.doReturn("Source Path not found").when(mockHttp404Op).getStorageErrorMessage(); +// Mockito.doReturn("SourcePathNotFound").when(mockHttp404Op).getStorageErrorCode(); +// +// +// // // mock object representing the 500 timeout result for first try of rename +// AbfsHttpOperation mockHttp500Op = Mockito.mock(AbfsHttpOperation.class); +// Mockito.doReturn(500).when(mockHttp500Op).getStatusCode(); +// Mockito.doThrow(IOException.class) +// .when(mockHttp500Op).processResponse(nullable(byte[].class), Mockito.any(int.class), Mockito.any(int.class)); +// Mockito.doNothing().when(mockHttp500Op).setRequestProperty(nullable(String.class), nullable(String.class)); +// Mockito.doNothing().when(mockHttp500Op).sendRequest(nullable(byte[].class), Mockito.any(int.class), Mockito.any(int.class)); +// Mockito.doReturn("PUT").when(mockHttp500Op).getMethod(); +// Mockito.doReturn("ClientTimeoutError").when(mockHttp500Op).getStorageErrorCode(); +// +// // creating mock HttpUrlConnection object +// HttpURLConnection mockUrlConn = Mockito.mock(HttpsURLConnection.class); +// +// // tying all mocks together +// Mockito.doReturn(mockUrlConn).when(mockHttp404Op).getConnection(); +// Mockito.doReturn(mockUrlConn).when(mockHttp500Op).getConnection(); // adding mock objects to current AbfsClient AbfsClient spyClient = Mockito.spy(fs.getAbfsStore().getClient()); // Rest Operation is spied as it needs to have spyclient instance as a param to the constructor // directly returning a mock for this would make the client instance null - AbfsRestOperation mockRestOp = Mockito.spy(new AbfsRestOperation( - AbfsRestOperationType.RenamePath, - spyClient, - HTTP_METHOD_PUT, - null, - null) - ); - Mockito.doReturn(mockRestOp).when(spyClient).createRenameRestOperation(nullable(URL.class), nullable(List.class)); +// AbfsRestOperation mockRestOp = Mockito.spy(new AbfsRestOperation( +// AbfsRestOperationType.RenamePath, +// spyClient, +// HTTP_METHOD_PUT, +// null, +// null) +// ); - Mockito.doReturn(mockHttp500Op).doReturn(mockHttp404Op).when(mockRestOp).createHttpOperation(); - Mockito.doReturn(mockHttp500Op).doReturn(mockHttp404Op).when(mockRestOp).getResult(); - Mockito.doReturn(true).when(mockRestOp).hasResult(); + Mockito.doAnswer(answer -> { + AbfsRestOperation op = new AbfsRestOperation(AbfsRestOperationType.RenamePath, + spyClient, HTTP_METHOD_PUT, answer.getArgument(0), answer.getArgument(1)); + AbfsRestOperation spiedOp = Mockito.spy(op); + addSpyBehavior(spiedOp, op, spyClient); + return spiedOp; + }).when(spyClient).createRenameRestOperation(nullable(URL.class), nullable(List.class)); +// Mockito.doReturn(mockRestOp).when(spyClient).createRenameRestOperation(nullable(URL.class), nullable(List.class)); +// +// Mockito.doReturn(mockHttp500Op).doReturn(mockHttp404Op).when(mockRestOp).createHttpOperation(); - SharedKeyCredentials mockSharedKeyCreds = mock(SharedKeyCredentials.class); - Mockito.doNothing().when(mockSharedKeyCreds).signRequest(Mockito.any(HttpURLConnection.class), Mockito.any(long.class)); - // real method calls made once at start and once at end - // for the two getPathStatus calls that actually have to be made - Mockito.doCallRealMethod().doReturn(mockSharedKeyCreds).doReturn(mockSharedKeyCreds).doCallRealMethod().when(spyClient).getSharedKeyCredentials(); + + + +// Mockito.doReturn(mockHttp500Op).doReturn(mockHttp404Op).when(mockRestOp).getResult(); + +// Mockito.doReturn(true).when(mockRestOp).hasResult(); + + +// SharedKeyCredentials mockSharedKeyCreds = mock(SharedKeyCredentials.class); +// Mockito.doNothing().when(mockSharedKeyCreds).signRequest(Mockito.any(HttpURLConnection.class), Mockito.any(long.class)); +// // real method calls made once at start and once at end +// // for the two getPathStatus calls that actually have to be made +// Mockito.doCallRealMethod().doReturn(mockSharedKeyCreds).doReturn(mockSharedKeyCreds).doCallRealMethod().when(spyClient).getSharedKeyCredentials(); return spyClient; } + private void addSpyBehavior(final AbfsRestOperation spiedOp, final AbfsRestOperation normalOp, AbfsClient client) + throws IOException { + AbfsHttpOperation abfsHttpOperation = Mockito.spy(normalOp.createHttpOperation()); + AbfsHttpOperation normalOp1 = normalOp.createHttpOperation(); + normalOp1.getConnection().setRequestProperty(HttpHeaderConfigurations.AUTHORIZATION, + client.getAccessToken()); + AbfsHttpOperation normalOp2 = normalOp.createHttpOperation(); + normalOp2.getConnection().setRequestProperty(HttpHeaderConfigurations.AUTHORIZATION, + client.getAccessToken()); + + int[] hits = new int[1]; + hits[0] = 0; + Mockito.doAnswer(answer -> { + if(hits[0] == 0) { + mockIdempotencyIssueBehaviours(abfsHttpOperation, normalOp1); + hits[0]++; + return abfsHttpOperation; + } + hits[0]++; + return normalOp2; + }).when(spiedOp).createHttpOperation(); + } + + private void mockIdempotencyIssueBehaviours(final AbfsHttpOperation abfsHttpOperation, + final AbfsHttpOperation normalOp) + throws IOException { + Mockito.doAnswer(answer -> { + normalOp.sendRequest(answer.getArgument(0), answer.getArgument(1), answer.getArgument(2)); + normalOp.processResponse(answer.getArgument(0), answer.getArgument(1), answer.getArgument(2)); + throw new SocketException("connection-reset"); + }).when(abfsHttpOperation).sendRequest(Mockito.nullable(byte[].class), + Mockito.nullable(int.class), Mockito.nullable(int.class)); + } + @Test public void testRenameRecoverySrcDestEtagSame() throws IOException { AzureBlobFileSystem fs = getFileSystem(); @@ -215,7 +265,6 @@ public void testRenameRecoverySrcDestEtagSame() throws IOException { String path2 = "/dummyFile2"; touch(new Path(path1)); - touch(new Path(path2)); // 404 and retry, send sourceEtag as null // source eTag matches -> rename should pass even when execute throws exception @@ -235,7 +284,6 @@ public void testRenameRecoverySrcDestEtagDifferent() throws Exception { String path2 = "/dummyFile2"; touch(new Path(path1)); - touch(new Path(path2)); // source eTag does not match -> throw exception AbfsRestOperationException e = intercept(AbfsRestOperationException.class, () -> From e36b4c2c2a8b2de2aefd3b94f27a4ac6694015ca Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Mon, 20 Mar 2023 18:47:49 +0000 Subject: [PATCH 09/10] HADOOP-18425. ABFS rename resilience: review feedback Integration testing all happy; had to do some work to get my auth mechanism work through the tests. Added test for dir handling, and commit renaming working through the failure. First time it's had this test, fwiw Change-Id: I89f7763d03d1a24a1a43361b001bfa5830804bc1 --- .../fs/azurebfs/services/AbfsClient.java | 16 +- .../services/AbfsClientRenameResult.java | 9 + .../azurebfs/services/AbfsRestOperation.java | 51 +-- .../services/TestAbfsRenameRetryRecovery.java | 309 ++++++++++++------ 4 files changed, 255 insertions(+), 130 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index 7477ac4ae247c..1f1dd3296e190 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -545,7 +545,10 @@ public AbfsClientRenameResult renamePath( sourceEtag = extractEtagHeader(result); // and update the directory status. - isDir = isEmpty(sourceEtag); + final String resourceType = + result.getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE); + isDir = AbfsHttpConstants.DIRECTORY.equalsIgnoreCase(resourceType); + LOG.debug("Retrieved etag of source for rename recovery: {}; isDir={}", sourceEtag, isDir); } String encodedRenameSource = urlEncode(FORWARD_SLASH + this.getFileSystem() + source); @@ -660,15 +663,14 @@ public boolean renameIdempotencyCheckOp( final boolean isDir) { Preconditions.checkArgument(op.hasResult(), "Operations has null HTTP response"); + LOG.debug("rename({}, {}) failure {}; retry={} isDir {} etag {}", + source, destination, op.getResult().getStatusCode(), op.isARetriedRequest(), isDir, sourceEtag); if (!(op.isARetriedRequest() && (op.getResult().getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND))) { - // this failed on the first attempt (no not retry related) - // *or* it was any error other than 404 - // do not attempt to recover from this failure. + // only attempt recovery if the failure was a 404 on a retried rename request. return false; } - LOG.debug("Source not found on retry of rename({}, {}) isDir {} etag {}", - source, destination, isDir, sourceEtag); + if (isDir) { // directory recovery is not supported. // log and fail. @@ -697,6 +699,8 @@ public boolean renameIdempotencyCheckOp( // or some other failure. log and swallow. LOG.debug("Failed to get status of path {}", destination, ex); } + } else { + LOG.debug("No source etag; unable to probe for the operation's success"); } return false; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientRenameResult.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientRenameResult.java index 86e3473a9fe5d..62e27ffdc056d 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientRenameResult.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientRenameResult.java @@ -58,4 +58,13 @@ public boolean isRenameRecovered() { public boolean isIncompleteMetadataState() { return isIncompleteMetadataState; } + + @Override + public String toString() { + return "AbfsClientRenameResult{" + + "op=" + op + + ", renameRecovered=" + renameRecovered + + ", isIncompleteMetadataState=" + isIncompleteMetadataState + + '}'; + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java index ad99020390a5e..94510fe3f79c6 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java @@ -264,26 +264,7 @@ private boolean executeHttpOperation(final int retryCount, incrementCounter(AbfsStatistic.CONNECTIONS_MADE, 1); tracingContext.constructHeader(httpOperation, failureReason); - switch(client.getAuthType()) { - case Custom: - case OAuth: - LOG.debug("Authenticating request with OAuth2 access token"); - httpOperation.getConnection().setRequestProperty(HttpHeaderConfigurations.AUTHORIZATION, - client.getAccessToken()); - break; - case SAS: - // do nothing; the SAS token should already be appended to the query string - httpOperation.setMaskForSAS(); //mask sig/oid from url for logs - break; - case SharedKey: - // sign the HTTP request - LOG.debug("Signing request with shared key"); - // sign the HTTP request - client.getSharedKeyCredentials().signRequest( - httpOperation.getConnection(), - hasRequestBody ? bufferLength : 0); - break; - } + signRequest(httpOperation, hasRequestBody ? bufferLength : 0); } catch (IOException e) { LOG.debug("Auth failure: {}, {}", method, url); throw new AbfsRestOperationException(-1, null, @@ -351,6 +332,36 @@ private boolean executeHttpOperation(final int retryCount, return true; } + /** + * Sign an operation. + * @param httpOperation operation to sign + * @param bytesToSign how many bytes to sign for shared key auth. + * @throws IOException failure + */ + @VisibleForTesting + public void signRequest(final AbfsHttpOperation httpOperation, int bytesToSign) throws IOException { + switch(client.getAuthType()) { + case Custom: + case OAuth: + LOG.debug("Authenticating request with OAuth2 access token"); + httpOperation.getConnection().setRequestProperty(HttpHeaderConfigurations.AUTHORIZATION, + client.getAccessToken()); + break; + case SAS: + // do nothing; the SAS token should already be appended to the query string + httpOperation.setMaskForSAS(); //mask sig/oid from url for logs + break; + case SharedKey: + // sign the HTTP request + LOG.debug("Signing request with shared key"); + // sign the HTTP request + client.getSharedKeyCredentials().signRequest( + httpOperation.getConnection(), + bytesToSign); + break; + } + } + /** * Creates new object of {@link AbfsHttpOperation} with the url, method, and * requestHeaders fields of the AbfsRestOperation object. diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java index 7ce4c60a1db5b..421ec5ff66280 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java @@ -18,33 +18,35 @@ package org.apache.hadoop.fs.azurebfs.services; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; -import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode; -import org.apache.hadoop.fs.azurebfs.utils.TracingContext; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.SocketException; +import java.net.URL; +import java.time.Duration; +import java.util.List; + import org.assertj.core.api.Assertions; -import org.junit.Assert; import org.junit.Assume; import org.junit.Test; import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.fs.EtagSource; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest; import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; +import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore; +import org.apache.hadoop.fs.azurebfs.commit.ResilientCommitByRename; +import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; -import org.apache.hadoop.fs.contract.ContractTestUtils; - -import javax.net.ssl.HttpsURLConnection; - -import java.io.IOException; -import java.net.HttpURLConnection; -import java.net.SocketException; -import java.net.URL; -import java.util.List; +import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode; +import org.apache.hadoop.fs.azurebfs.utils.TracingContext; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT; +import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.PATH_ALREADY_EXISTS; import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND; import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND; import static org.apache.hadoop.test.LambdaTestUtils.intercept; @@ -144,48 +146,8 @@ public void testRenameFailuresDueToIncompleteMetadata() throws Exception { AbfsClient getMockAbfsClient() throws IOException { AzureBlobFileSystem fs = getFileSystem(); - // specifying AbfsHttpOperation mock behavior - - // mock object representing the 404 path not found result -// AbfsHttpOperation mockHttp404Op = Mockito.mock(AbfsHttpOperation.class); -// Mockito.doReturn(404).when(mockHttp404Op).getStatusCode(); -// Mockito.doNothing().when(mockHttp404Op).processResponse(nullable(byte[].class), Mockito.any(int.class), Mockito.any(int.class)); -// Mockito.doNothing().when(mockHttp404Op).setRequestProperty(nullable(String.class), nullable(String.class)); -// Mockito.doNothing().when(mockHttp404Op).sendRequest(nullable(byte[].class), Mockito.any(int.class), Mockito.any(int.class)); -// Mockito.doReturn("PUT").when(mockHttp404Op).getMethod(); -// Mockito.doReturn("Source Path not found").when(mockHttp404Op).getStorageErrorMessage(); -// Mockito.doReturn("SourcePathNotFound").when(mockHttp404Op).getStorageErrorCode(); -// -// -// // // mock object representing the 500 timeout result for first try of rename -// AbfsHttpOperation mockHttp500Op = Mockito.mock(AbfsHttpOperation.class); -// Mockito.doReturn(500).when(mockHttp500Op).getStatusCode(); -// Mockito.doThrow(IOException.class) -// .when(mockHttp500Op).processResponse(nullable(byte[].class), Mockito.any(int.class), Mockito.any(int.class)); -// Mockito.doNothing().when(mockHttp500Op).setRequestProperty(nullable(String.class), nullable(String.class)); -// Mockito.doNothing().when(mockHttp500Op).sendRequest(nullable(byte[].class), Mockito.any(int.class), Mockito.any(int.class)); -// Mockito.doReturn("PUT").when(mockHttp500Op).getMethod(); -// Mockito.doReturn("ClientTimeoutError").when(mockHttp500Op).getStorageErrorCode(); -// -// // creating mock HttpUrlConnection object -// HttpURLConnection mockUrlConn = Mockito.mock(HttpsURLConnection.class); -// -// // tying all mocks together -// Mockito.doReturn(mockUrlConn).when(mockHttp404Op).getConnection(); -// Mockito.doReturn(mockUrlConn).when(mockHttp500Op).getConnection(); - // adding mock objects to current AbfsClient AbfsClient spyClient = Mockito.spy(fs.getAbfsStore().getClient()); - // Rest Operation is spied as it needs to have spyclient instance as a param to the constructor - // directly returning a mock for this would make the client instance null -// AbfsRestOperation mockRestOp = Mockito.spy(new AbfsRestOperation( -// AbfsRestOperationType.RenamePath, -// spyClient, -// HTTP_METHOD_PUT, -// null, -// null) -// ); - Mockito.doAnswer(answer -> { AbfsRestOperation op = new AbfsRestOperation(AbfsRestOperationType.RenamePath, @@ -195,62 +157,68 @@ AbfsClient getMockAbfsClient() throws IOException { return spiedOp; }).when(spyClient).createRenameRestOperation(nullable(URL.class), nullable(List.class)); -// Mockito.doReturn(mockRestOp).when(spyClient).createRenameRestOperation(nullable(URL.class), nullable(List.class)); -// -// Mockito.doReturn(mockHttp500Op).doReturn(mockHttp404Op).when(mockRestOp).createHttpOperation(); - - - - -// Mockito.doReturn(mockHttp500Op).doReturn(mockHttp404Op).when(mockRestOp).getResult(); - -// Mockito.doReturn(true).when(mockRestOp).hasResult(); - - -// SharedKeyCredentials mockSharedKeyCreds = mock(SharedKeyCredentials.class); -// Mockito.doNothing().when(mockSharedKeyCreds).signRequest(Mockito.any(HttpURLConnection.class), Mockito.any(long.class)); -// // real method calls made once at start and once at end -// // for the two getPathStatus calls that actually have to be made -// Mockito.doCallRealMethod().doReturn(mockSharedKeyCreds).doReturn(mockSharedKeyCreds).doCallRealMethod().when(spyClient).getSharedKeyCredentials(); - return spyClient; } - private void addSpyBehavior(final AbfsRestOperation spiedOp, final AbfsRestOperation normalOp, AbfsClient client) + /** + * Spies on a rest operation to inject transient failure. + * the first createHttpOperation() invocation will return an abfs rest operation + * which will fail. + * @param spiedRestOp spied operation whose createHttpOperation() will fail first time + * @param normalRestOp normal operation the good operation + * @param client client. + * @throws IOException failure + */ + private void addSpyBehavior(final AbfsRestOperation spiedRestOp, + final AbfsRestOperation normalRestOp, + final AbfsClient client) throws IOException { - AbfsHttpOperation abfsHttpOperation = Mockito.spy(normalOp.createHttpOperation()); - AbfsHttpOperation normalOp1 = normalOp.createHttpOperation(); - normalOp1.getConnection().setRequestProperty(HttpHeaderConfigurations.AUTHORIZATION, - client.getAccessToken()); - AbfsHttpOperation normalOp2 = normalOp.createHttpOperation(); + AbfsHttpOperation failingOperation = Mockito.spy(normalRestOp.createHttpOperation()); + AbfsHttpOperation normalOp1 = normalRestOp.createHttpOperation(); + executeThenFail(client, normalRestOp, failingOperation, normalOp1); + AbfsHttpOperation normalOp2 = normalRestOp.createHttpOperation(); normalOp2.getConnection().setRequestProperty(HttpHeaderConfigurations.AUTHORIZATION, client.getAccessToken()); - int[] hits = new int[1]; - hits[0] = 0; - Mockito.doAnswer(answer -> { - if(hits[0] == 0) { - mockIdempotencyIssueBehaviours(abfsHttpOperation, normalOp1); - hits[0]++; - return abfsHttpOperation; - } - hits[0]++; - return normalOp2; - }).when(spiedOp).createHttpOperation(); + when(spiedRestOp.createHttpOperation()) + .thenReturn(failingOperation) + .thenReturn(normalOp2); } - private void mockIdempotencyIssueBehaviours(final AbfsHttpOperation abfsHttpOperation, + /** + * Mock an idempotency failure by executing the normal operation, then + * raising an IOE. + * @param normalRestOp the rest operation used to sign the requests. + * @param failingOperation failing operation + * @param normalOp good operation + * @throws IOException failure + */ + private void executeThenFail(final AbfsClient client, + final AbfsRestOperation normalRestOp, + final AbfsHttpOperation failingOperation, final AbfsHttpOperation normalOp) throws IOException { Mockito.doAnswer(answer -> { - normalOp.sendRequest(answer.getArgument(0), answer.getArgument(1), answer.getArgument(2)); - normalOp.processResponse(answer.getArgument(0), answer.getArgument(1), answer.getArgument(2)); + LOG.info("Executing first attempt with post-operation fault injection"); + final byte[] buffer = answer.getArgument(0); + final int offset = answer.getArgument(1); + final int length = answer.getArgument(2); + normalRestOp.signRequest(normalOp, length); + normalOp.sendRequest(buffer, offset, length); + normalOp.processResponse(buffer, offset, length); + LOG.info("Actual outcome is {} \"{}\" \"{}\"; injecting failure", + normalOp.getStatusCode(), + normalOp.getStorageErrorCode(), + normalOp.getStorageErrorMessage()); throw new SocketException("connection-reset"); - }).when(abfsHttpOperation).sendRequest(Mockito.nullable(byte[].class), + }).when(failingOperation).sendRequest(Mockito.nullable(byte[].class), Mockito.nullable(int.class), Mockito.nullable(int.class)); } + /** + * This is the good outcome: resilient rename. + */ @Test public void testRenameRecoverySrcDestEtagSame() throws IOException { AzureBlobFileSystem fs = getFileSystem(); @@ -260,19 +228,30 @@ public void testRenameRecoverySrcDestEtagSame() throws IOException { AbfsClient mockClient = getMockAbfsClient(); - - String path1 = "/dummyFile1"; - String path2 = "/dummyFile2"; + String base = "/" + getMethodName(); + String path1 = base + "/dummyFile1"; + String path2 = base + "/dummyFile2"; touch(new Path(path1)); // 404 and retry, send sourceEtag as null // source eTag matches -> rename should pass even when execute throws exception - mockClient.renamePath(path1, path1, null, testTracingContext, null, false); + final AbfsClientRenameResult result = + mockClient.renamePath(path1, path2, null, testTracingContext, null, false); + Assertions.assertThat(result.isRenameRecovered()) + .describedAs("rename result recovered flag of %s", result) + .isTrue(); } + /** + * execute a failing rename but have the file at the far end not match. + * This is done by explicitly passing in a made up etag for the source + * etag and creating a file at the far end. + * The first rename will actually fail with a path exists exception, + * but as that is swallowed, it's not a problem. + */ @Test - public void testRenameRecoverySrcDestEtagDifferent() throws Exception { + public void testRenameRecoverySourceDestEtagDifferent() throws Exception { AzureBlobFileSystem fs = getFileSystem(); TracingContext testTracingContext = getTestTracingContext(fs, false); @@ -280,19 +259,141 @@ public void testRenameRecoverySrcDestEtagDifferent() throws Exception { AbfsClient spyClient = getMockAbfsClient(); - String path1 = "/dummyFile1"; - String path2 = "/dummyFile2"; + String base = "/" + getMethodName(); + String path1 = base + "/dummyFile1"; + String path2 = base + "/dummyFile2"; - touch(new Path(path1)); + touch(new Path(path2)); // source eTag does not match -> throw exception - AbfsRestOperationException e = intercept(AbfsRestOperationException.class, () -> - spyClient.renamePath(path1, path2, null, testTracingContext, null, false)); - if (e.getErrorCode() != SOURCE_PATH_NOT_FOUND) { + expectErrorCode(SOURCE_PATH_NOT_FOUND, intercept(AbfsRestOperationException.class, () -> + spyClient.renamePath(path1, path2, null, testTracingContext, "source", false)) + ); + } + + /** + * Assert that an exception failed with a specific error code. + * @param code code + * @param e exception + * @throws AbfsRestOperationException if there is a mismatch + */ + private static void expectErrorCode(final AzureServiceErrorCode code, + final AbfsRestOperationException e) throws AbfsRestOperationException { + if (e.getErrorCode() != code) { throw e; } } + /** + * Directory rename failure is unrecoverable. + */ + @Test + public void testDirRenameRecoveryUnsupported() throws Exception { + AzureBlobFileSystem fs = getFileSystem(); + TracingContext testTracingContext = getTestTracingContext(fs, false); + + Assume.assumeTrue(fs.getAbfsStore().getIsNamespaceEnabled(testTracingContext)); + + AbfsClient spyClient = getMockAbfsClient(); + + String base = "/" + getMethodName(); + String path1 = base + "/dummyDir1"; + String path2 = base + "/dummyDir2"; + + fs.mkdirs(new Path(path1)); + + // source eTag does not match -> throw exception + expectErrorCode(SOURCE_PATH_NOT_FOUND, intercept(AbfsRestOperationException.class, () -> + spyClient.renamePath(path1, path2, null, testTracingContext, null, false))); + } + + /** + * Even with failures, having + */ + @Test + public void testExistingPathCorrectlyRejected() throws Exception { + AzureBlobFileSystem fs = getFileSystem(); + TracingContext testTracingContext = getTestTracingContext(fs, false); + + Assume.assumeTrue(fs.getAbfsStore().getIsNamespaceEnabled(testTracingContext)); + + AbfsClient spyClient = getMockAbfsClient(); + + String base = "/" + getMethodName(); + String path1 = base + "/dummyDir1"; + String path2 = base + "/dummyDir2"; + + + touch(new Path(path1)); + touch(new Path(path2)); + + // source eTag does not match -> throw exception + expectErrorCode(PATH_ALREADY_EXISTS, intercept(AbfsRestOperationException.class, () -> + spyClient.renamePath(path1, path2, null, testTracingContext, null, false))); + } + + /** + * Test the resilient commit code works through fault injection, including + * reporting recovery. + */ + @Test + public void testResilientCommitOperation() throws Throwable { + AzureBlobFileSystem fs = getFileSystem(); + TracingContext testTracingContext = getTestTracingContext(fs, false); + + final AzureBlobFileSystemStore store = fs.getAbfsStore(); + Assume.assumeTrue(store.getIsNamespaceEnabled(testTracingContext)); + + // patch in the mock abfs client to the filesystem, for the resilient + // commit API to pick up. + setAbfsClient(store, getMockAbfsClient()); + + String base = "/" + getMethodName(); + String path1 = base + "/dummyDir1"; + String path2 = base + "/dummyDir2"; + + + final Path source = new Path(path1); + touch(source); + final String sourceTag = ((EtagSource) fs.getFileStatus(source)).getEtag(); + + final ResilientCommitByRename commit = fs.createResilientCommitSupport(source); + final Pair outcome = + commit.commitSingleFileByRename(source, new Path(path2), sourceTag); + Assertions.assertThat(outcome.getKey()) + .describedAs("recovery flag") + .isTrue(); + } + /** + * Test the resilient commit code works through fault injection, including + * reporting recovery. + */ + @Test + public void testResilientCommitOperationTagMismatch() throws Throwable { + AzureBlobFileSystem fs = getFileSystem(); + TracingContext testTracingContext = getTestTracingContext(fs, false); + + final AzureBlobFileSystemStore store = fs.getAbfsStore(); + Assume.assumeTrue(store.getIsNamespaceEnabled(testTracingContext)); + + // patch in the mock abfs client to the filesystem, for the resilient + // commit API to pick up. + setAbfsClient(store, getMockAbfsClient()); + + String base = "/" + getMethodName(); + String path1 = base + "/dummyDir1"; + String path2 = base + "/dummyDir2"; + + + final Path source = new Path(path1); + touch(source); + final String sourceTag = ((EtagSource) fs.getFileStatus(source)).getEtag(); + + final ResilientCommitByRename commit = fs.createResilientCommitSupport(source); + intercept(FileNotFoundException.class, () -> + commit.commitSingleFileByRename(source, new Path(path2), "not the right tag")); + } + /** * Method to create an AbfsRestOperationException. * @param statusCode status code to be used. From a303e33c02616db4c1b165c1797b60cad5801597 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Mon, 20 Mar 2023 22:00:49 +0000 Subject: [PATCH 10/10] HADOOP-18425. getPathStatus() on rename downgrades to returning false Change-Id: Iceb0042b2d97725d0864d138d3a522f29fb5c867 --- .../fs/azurebfs/services/AbfsClient.java | 32 +++++++++++++------ 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index 1f1dd3296e190..582e4969a60db 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -38,6 +38,7 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.store.LogExactlyOnce; import org.apache.hadoop.util.Preconditions; import org.apache.hadoop.thirdparty.com.google.common.base.Strings; @@ -78,7 +79,9 @@ import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.HTTPS_SCHEME; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.*; import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.*; +import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.PATH_NOT_FOUND; import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND; +import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND; /** * AbfsClient. @@ -539,16 +542,25 @@ public AbfsClientRenameResult renamePath( if (!hasEtag && renameResilience) { // no etag was passed in and rename resilience is enabled, so // get the value - final AbfsRestOperation srcStatusOp = getPathStatus(source, - false, tracingContext); - final AbfsHttpOperation result = srcStatusOp.getResult(); - - sourceEtag = extractEtagHeader(result); - // and update the directory status. - final String resourceType = - result.getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE); - isDir = AbfsHttpConstants.DIRECTORY.equalsIgnoreCase(resourceType); - LOG.debug("Retrieved etag of source for rename recovery: {}; isDir={}", sourceEtag, isDir); + + try { + final AbfsRestOperation srcStatusOp = getPathStatus(source, + false, tracingContext); + final AbfsHttpOperation result = srcStatusOp.getResult(); + + sourceEtag = extractEtagHeader(result); + // and update the directory status. + final String resourceType = + result.getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE); + isDir = AbfsHttpConstants.DIRECTORY.equalsIgnoreCase(resourceType); + LOG.debug("Retrieved etag of source for rename recovery: {}; isDir={}", sourceEtag, isDir); + } catch (AbfsRestOperationException e) { + // switch file not found to source not found + if (PATH_NOT_FOUND.equals(e.getErrorCode())) { + throw new AbfsRestOperationException(e.getStatusCode(), SOURCE_PATH_NOT_FOUND.getErrorCode(), + e.getMessage(), e); + } + } } String encodedRenameSource = urlEncode(FORWARD_SLASH + this.getFileSystem() + source);