From a4122e276ad2264c6303eecc3584b63f865dd353 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Wed, 8 Mar 2023 14:35:59 +0000 Subject: [PATCH 1/2] HADOOP-18657. Tune ABFS create() retry logic Production code changes; no tests yet. Something with mockito is going to be needed here Change-Id: I430a9f0e6796461ccec8c23cd80d024258703048 --- .../fs/azurebfs/AzureBlobFileSystemStore.java | 45 ++++++++++++++----- 1 file changed, 33 insertions(+), 12 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index e5e7056126564..9dd252405d382 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -55,6 +55,7 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.fs.azurebfs.services.AbfsErrors; import org.apache.hadoop.util.Preconditions; import org.apache.hadoop.thirdparty.com.google.common.base.Strings; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures; @@ -611,7 +612,7 @@ private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePa final String umask, final boolean isAppendBlob, TracingContext tracingContext) throws AzureBlobFileSystemException { - AbfsRestOperation op; + AbfsRestOperation op = null; try { // Trigger a create with overwrite=false first so that eTag fetch can be @@ -621,37 +622,57 @@ private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePa isAppendBlob, null, tracingContext); } catch (AbfsRestOperationException e) { + LOG.debug("Failed to create {}", relativePath, e); if (e.getStatusCode() == HttpURLConnection.HTTP_CONFLICT) { // File pre-exists, fetch eTag + LOG.debug("Fetching etag of {}", relativePath); try { op = client.getPathStatus(relativePath, false, tracingContext); } catch (AbfsRestOperationException ex) { + LOG.debug("Failed to to getPathStatus {}", relativePath, ex); if (ex.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) { // Is a parallel access case, as file which was found to be // present went missing by this request. - throw new ConcurrentWriteOperationDetectedException( - "Parallel access to the create path detected. Failing request " - + "to honor single writer semantics"); + // this means the other thread deleted it and the conflict + // has implicitly been resolved. + LOG.debug("File at {} has been deleted; creation can continue", relativePath); } else { throw ex; } } - String eTag = op.getResult() - .getResponseHeader(HttpHeaderConfigurations.ETAG); + String eTag = op != null + ? op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG) + : null; + LOG.debug("Attempting to create file {} with etag of {}", relativePath, eTag); try { - // overwrite only if eTag matches with the file properties fetched befpre - op = client.createPath(relativePath, true, true, permission, umask, + // overwrite only if eTag matches with the file properties fetched or the file + // was deleted and there is no etag. + // if the etag was not retrieved, overwrite is still false, so will fail + // if another process has just created the file + op = client.createPath(relativePath, true, eTag != null, permission, umask, isAppendBlob, eTag, tracingContext); } catch (AbfsRestOperationException ex) { - if (ex.getStatusCode() == HttpURLConnection.HTTP_PRECON_FAILED) { + final int sc = ex.getStatusCode(); + LOG.debug("Failed to create file {} with etag {}; status code={}", + relativePath, eTag, sc, ex); + if (sc == HttpURLConnection.HTTP_PRECON_FAILED + || sc == HttpURLConnection.HTTP_CONFLICT) { // Is a parallel access case, as file with eTag was just queried // and precondition failure can happen only when another file with // different etag got created. - throw new ConcurrentWriteOperationDetectedException( - "Parallel access to the create path detected. Failing request " - + "to honor single writer semantics"); + // OR leasing is enabled on the directory and this client + // does not have the lease. + final ConcurrentWriteOperationDetectedException ex2 = + new ConcurrentWriteOperationDetectedException( + AbfsErrors.ERR_PARALLEL_ACCESS_DETECTED + + " Path =\"" + relativePath+ "\"" + + "; Status code =" + sc + + "; etag = \"" + eTag + "\"" + + "; error =" + ex.getErrorMessage()); + ex2.initCause(ex); + throw ex2; } else { throw ex; } From 1853a46ecbb41baf82035664e30cf03584b77a64 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Wed, 15 Mar 2023 20:36:47 +0000 Subject: [PATCH 2/2] HADOOP-18657. abfs-create-resilience log error at warn; full stack at DEBUG. Change-Id: Id05d8d0fa0d5913529cbaa093bf7cf8ed09d5031 --- .../fs/azurebfs/AzureBlobFileSystemStore.java | 47 ++++++++++++++----- ...urrentWriteOperationDetectedException.java | 5 ++ 2 files changed, 39 insertions(+), 13 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 9dd252405d382..2a35dcddbba6d 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -641,22 +641,35 @@ private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePa } } + // get the etag of the file at the destination; this will be made + // the condition of the second createPath call. String eTag = op != null ? op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG) : null; - LOG.debug("Attempting to create file {} with etag of {}", relativePath, eTag); + final boolean overwrite = eTag != null; + final String action = overwrite ? "overwrite" : "create"; + LOG.debug("Attempting to {} file {} with etag of {}", + action, + relativePath, eTag); try { // overwrite only if eTag matches with the file properties fetched or the file // was deleted and there is no etag. // if the etag was not retrieved, overwrite is still false, so will fail // if another process has just created the file - op = client.createPath(relativePath, true, eTag != null, permission, umask, + + op = client.createPath(relativePath, true, overwrite, permission, umask, isAppendBlob, eTag, tracingContext); + } catch (AbfsRestOperationException ex) { final int sc = ex.getStatusCode(); - LOG.debug("Failed to create file {} with etag {}; status code={}", - relativePath, eTag, sc, ex); + + // Create a detailed error message. + final String details = "Path =\"" + relativePath + "\"" + + "; Status code =" + sc + + "; etag = \"" + eTag + "\"" + + "; operation = \"" + action + "\"" + + "; error =" + ex.getErrorMessage(); if (sc == HttpURLConnection.HTTP_PRECON_FAILED || sc == HttpURLConnection.HTTP_CONFLICT) { // Is a parallel access case, as file with eTag was just queried @@ -664,16 +677,24 @@ private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePa // different etag got created. // OR leasing is enabled on the directory and this client // does not have the lease. - final ConcurrentWriteOperationDetectedException ex2 = - new ConcurrentWriteOperationDetectedException( - AbfsErrors.ERR_PARALLEL_ACCESS_DETECTED - + " Path =\"" + relativePath+ "\"" - + "; Status code =" + sc - + "; etag = \"" + eTag + "\"" - + "; error =" + ex.getErrorMessage()); - ex2.initCause(ex); - throw ex2; + + + final String errorText = AbfsErrors.ERR_PARALLEL_ACCESS_DETECTED + " " + details; + + // Add a message to the log, including causes + LOG.warn("{}.", errorText); + LOG.warn("This is a race condition or another process has a lease on" + + " the parent directory."); + // log full stack trace at debug + LOG.debug("{}", errorText, ex); + // then throw a specific exception class + throw new ConcurrentWriteOperationDetectedException(errorText, ex); } else { + // another cause. warn + LOG.warn("Failed {}", details); + // print the stack at debug + LOG.debug("{}", details, ex); + // throw without wrapping throw ex; } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/ConcurrentWriteOperationDetectedException.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/ConcurrentWriteOperationDetectedException.java index 79813ddfe6400..b6a53b98118d5 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/ConcurrentWriteOperationDetectedException.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/ConcurrentWriteOperationDetectedException.java @@ -29,4 +29,9 @@ public class ConcurrentWriteOperationDetectedException public ConcurrentWriteOperationDetectedException(String message) { super(message); } + + public ConcurrentWriteOperationDetectedException(final String message, + final Throwable innerThrowable) { + super(message, innerThrowable); + } }