Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,11 @@ public class AbfsCountersImpl implements AbfsCounters {
BYTES_RECEIVED,
READ_THROTTLES,
WRITE_THROTTLES,
SERVER_UNAVAILABLE
SERVER_UNAVAILABLE,
RENAME_RECOVERY,
METADATA_INCOMPLETE_RENAME_FAILURES,
RENAME_PATH_ATTEMPTS

};

private static final AbfsStatistic[] DURATION_TRACKER_LIST = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,16 @@ public enum AbfsStatistic {
AbfsHttpConstants.HTTP_METHOD_PATCH),
HTTP_POST_REQUEST(StoreStatisticNames.ACTION_HTTP_POST_REQUEST,
"Time taken to complete a POST request",
AbfsHttpConstants.HTTP_METHOD_POST);
AbfsHttpConstants.HTTP_METHOD_POST),

// Rename recovery
RENAME_RECOVERY("rename_recovery",
"Number of times Rename recoveries happened"),
METADATA_INCOMPLETE_RENAME_FAILURES("metadata_incomplete_rename_failures",
"Number of times rename operation failed due to metadata being "
+ "incomplete"),
RENAME_PATH_ATTEMPTS("rename_path_attempts",
"Number of times we attempt to rename a path internally");

private String statName;
private String statDescription;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1576,7 +1576,7 @@ public boolean failed() {
}

@VisibleForTesting
AzureBlobFileSystemStore getAbfsStore() {
public AzureBlobFileSystemStore getAbfsStore() {
return abfsStore;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -97,6 +96,7 @@
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsClientContext;
import org.apache.hadoop.fs.azurebfs.services.AbfsClientContextBuilder;
import org.apache.hadoop.fs.azurebfs.services.AbfsClientRenameResult;
import org.apache.hadoop.fs.azurebfs.services.AbfsCounters;
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
Expand Down Expand Up @@ -132,6 +132,8 @@
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.http.client.utils.URIBuilder;

import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.METADATA_INCOMPLETE_RENAME_FAILURES;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.RENAME_RECOVERY;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_EQUALS;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_FORWARD_SLASH;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_HYPHEN;
Expand Down Expand Up @@ -919,18 +921,19 @@ public boolean rename(final Path source,

do {
try (AbfsPerfInfo perfInfo = startTracking("rename", "renamePath")) {
final Pair<AbfsRestOperation, Boolean> pair =
final AbfsClientRenameResult abfsClientRenameResult =
client.renamePath(sourceRelativePath, destinationRelativePath,
continuation, tracingContext, sourceEtag);
continuation, tracingContext, sourceEtag, false);

AbfsRestOperation op = pair.getLeft();
AbfsRestOperation op = abfsClientRenameResult.getOp();
perfInfo.registerResult(op.getResult());
continuation = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION);
perfInfo.registerSuccess(true);
countAggregate++;
shouldContinue = continuation != null && !continuation.isEmpty();
// update the recovery flag.
recovered |= pair.getRight();
recovered |= abfsClientRenameResult.isRenameRecovered();
populateRenameRecoveryStatistics(abfsClientRenameResult);
if (!shouldContinue) {
perfInfo.registerAggregates(startAggregate, countAggregate);
}
Expand Down Expand Up @@ -1905,7 +1908,7 @@ public AzureBlobFileSystemStoreBuilder build() {
}

@VisibleForTesting
AbfsClient getClient() {
public AbfsClient getClient() {
return this.client;
}

Expand Down Expand Up @@ -1973,4 +1976,19 @@ public static String extractEtagHeader(AbfsHttpOperation result) {
}
return etag;
}

/**
* Increment rename recovery based counters in IOStatistics.
*
* @param abfsClientRenameResult Result of an ABFS rename operation.
*/
private void populateRenameRecoveryStatistics(
AbfsClientRenameResult abfsClientRenameResult) {
if (abfsClientRenameResult.isRenameRecovered()) {
abfsCounters.incrementCounter(RENAME_RECOVERY, 1);
}
if (abfsClientRenameResult.isIncompleteMetadataState()) {
abfsCounters.incrementCounter(METADATA_INCOMPLETE_RENAME_FAILURES, 1);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.store.LogExactlyOnce;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.FutureCallback;
Expand All @@ -51,7 +52,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
import org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams;
Expand All @@ -69,13 +69,15 @@
import org.apache.hadoop.util.concurrent.HadoopExecutors;

import static org.apache.commons.lang3.StringUtils.isNotEmpty;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.RENAME_PATH_ATTEMPTS;
import static org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.extractEtagHeader;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.*;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_DELETE_CONSIDERED_IDEMPOTENT;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SERVER_SIDE_ENCRYPTION_ALGORITHM;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.HTTPS_SCHEME;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.*;
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.*;
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND;

/**
* AbfsClient.
Expand All @@ -102,6 +104,10 @@ public class AbfsClient implements Closeable {

private final ListeningScheduledExecutorService executorService;

/** logging the rename failure if metadata is in an incomplete state. */
private static final LogExactlyOnce ABFS_METADATA_INCOMPLETE_RENAME_FAILURE =
new LogExactlyOnce(LOG);

private AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
final AbfsConfiguration abfsConfiguration,
final AbfsClientContext abfsClientContext)
Expand Down Expand Up @@ -496,15 +502,19 @@ public AbfsRestOperation breakLease(final String path,
* @param continuation continuation.
* @param tracingContext trace context
* @param sourceEtag etag of source file. may be null or empty
* @return pair of (the rename operation, flag indicating recovery took place)
* @param isMetadataIncompleteState was there a rename failure due to
* incomplete metadata state?
* @return AbfsClientRenameResult result of rename operation indicating the
* AbfsRest operation, rename recovery and incomplete metadata state failure.
* @throws AzureBlobFileSystemException failure, excluding any recovery from overload failures.
*/
public Pair<AbfsRestOperation, Boolean> renamePath(
public AbfsClientRenameResult renamePath(
final String source,
final String destination,
final String continuation,
final TracingContext tracingContext,
final String sourceEtag)
final String sourceEtag,
boolean isMetadataIncompleteState)
throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();

Expand All @@ -531,13 +541,45 @@ public Pair<AbfsRestOperation, Boolean> renamePath(
url,
requestHeaders);
try {
incrementAbfsRenamePath();
op.execute(tracingContext);
return Pair.of(op, false);
// AbfsClientResult contains the AbfsOperation, If recovery happened or
// not, and the incompleteMetaDataState is true or false.
// If we successfully rename a path and isMetadataIncompleteState was
// true, then rename was recovered, else it didn't, this is why
// isMetadataIncompleteState is used for renameRecovery(as the 2nd param).
return new AbfsClientRenameResult(op, isMetadataIncompleteState, isMetadataIncompleteState);
} catch (AzureBlobFileSystemException e) {
// If we have no HTTP response, throw the original exception.
if (!op.hasResult()) {
throw e;
}

// ref: HADOOP-18242. Rename failure occurring due to a rare case of
// tracking metadata being in incomplete state.
if (op.getResult().getStorageErrorCode()
.equals(RENAME_DESTINATION_PARENT_PATH_NOT_FOUND.getErrorCode())
&& !isMetadataIncompleteState) {
//Logging
ABFS_METADATA_INCOMPLETE_RENAME_FAILURE
.info("Rename Failure attempting to resolve tracking metadata state and retrying.");

// Doing a HEAD call resolves the incomplete metadata state and
// then we can retry the rename operation.
AbfsRestOperation sourceStatusOp = getPathStatus(source, false,
tracingContext);
isMetadataIncompleteState = true;
// Extract the sourceEtag, using the status Op, and set it
// for future rename recovery.
AbfsHttpOperation sourceStatusResult = sourceStatusOp.getResult();
String sourceEtagAfterFailure = extractEtagHeader(sourceStatusResult);
renamePath(source, destination, continuation, tracingContext,
sourceEtagAfterFailure, isMetadataIncompleteState);
}
// if we get out of the condition without a successful rename, then
// it isn't metadata incomplete state issue.
isMetadataIncompleteState = false;

boolean etagCheckSucceeded = renameIdempotencyCheckOp(
source,
sourceEtag, op, destination, tracingContext);
Expand All @@ -546,10 +588,14 @@ public Pair<AbfsRestOperation, Boolean> renamePath(
// throw back the exception
throw e;
}
return Pair.of(op, true);
return new AbfsClientRenameResult(op, true, isMetadataIncompleteState);
}
}

private void incrementAbfsRenamePath() {
abfsCounters.incrementCounter(RENAME_PATH_ATTEMPTS, 1);
}

/**
* Check if the rename request failure is post a retry and if earlier rename
* request might have succeeded at back-end.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.azurebfs.services;

/**
* A class to store the Result of an AbfsClient rename operation, signifying the
* AbfsRestOperation result and the rename recovery.
*/
public class AbfsClientRenameResult {

/** Abfs Rest Operation. */
private final AbfsRestOperation op;
/** Flag indicating recovery took place. */
private final boolean renameRecovered;
/** Abfs storage tracking metadata is in an incomplete state. */
private final boolean isIncompleteMetadataState;

/**
* Constructing an ABFS rename operation result.
* @param op The AbfsRestOperation.
* @param renameRecovered Did rename recovery took place?
* @param isIncompleteMetadataState Did the rename failed due to incomplete
* metadata state and had to be retried?
*/
public AbfsClientRenameResult(
AbfsRestOperation op,
boolean renameRecovered,
boolean isIncompleteMetadataState) {
this.op = op;
this.renameRecovered = renameRecovered;
this.isIncompleteMetadataState = isIncompleteMetadataState;
}

public AbfsRestOperation getOp() {
return op;
}

public boolean isRenameRecovered() {
return renameRecovered;
}

public boolean isIncompleteMetadataState() {
return isIncompleteMetadataState;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -401,8 +401,8 @@ public void testSignatureMask() throws Exception {
fs.create(new Path(src)).close();
AbfsRestOperation abfsHttpRestOperation = fs.getAbfsClient()
.renamePath(src, "/testABC" + "/abc.txt", null,
getTestTracingContext(fs, false), null)
.getLeft();
getTestTracingContext(fs, false), null, false)
.getOp();
AbfsHttpOperation result = abfsHttpRestOperation.getResult();
String url = result.getMaskedUrl();
String encodedUrl = result.getMaskedEncodedUrl();
Expand All @@ -419,7 +419,7 @@ public void testSignatureMaskOnExceptionMessage() throws Exception {
intercept(IOException.class, "sig=XXXX",
() -> getFileSystem().getAbfsClient()
.renamePath("testABC/test.xt", "testABC/abc.txt", null,
getTestTracingContext(getFileSystem(), false), null));
getTestTracingContext(getFileSystem(), false), null, false));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,17 @@

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.statistics.IOStatisticAssertions;
import org.apache.hadoop.fs.statistics.IOStatistics;

import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.RENAME_PATH_ATTEMPTS;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertIsFile;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertMkdirs;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathDoesNotExist;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathExists;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertRenameOutcome;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;

/**
* Test rename operation.
Expand Down Expand Up @@ -167,4 +172,30 @@ public void testPosixRenameDirectory() throws Exception {
new Path(testDir2 + "/test1/test2/test3"));
}

@Test
public void testRenameWithNoDestinationParentDir() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add similar test case for resilient rename api, here or in ITestAbfsManifestStoreOperations

describe("Verifying the expected behaviour of ABFS rename when "
+ "destination parent Dir doesn't exist.");

final AzureBlobFileSystem fs = getFileSystem();
Path sourcePath = path(getMethodName());
Path destPath = new Path("falseParent", "someChildFile");

byte[] data = dataset(1024, 'a', 'z');
writeDataset(fs, sourcePath, data, data.length, 1024, true);

// Verify that renaming on a destination with no parent dir wasn't
// successful.
assertFalse("Rename result expected to be false with no Parent dir",
fs.rename(sourcePath, destPath));

// Verify that metadata was in an incomplete state after the rename
// failure, and we retired the rename once more.
IOStatistics ioStatistics = fs.getIOStatistics();
IOStatisticAssertions.assertThatStatisticCounter(ioStatistics,
RENAME_PATH_ATTEMPTS.getStatName())
.describedAs("There should be 2 rename attempts if metadata "
+ "incomplete state failure is hit")
.isEqualTo(2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -526,8 +526,8 @@ private void testRenamePath(final boolean isWithCPK) throws Exception {
AbfsClient abfsClient = fs.getAbfsClient();
AbfsRestOperation abfsRestOperation = abfsClient
.renamePath(testFileName, newName, null,
getTestTracingContext(fs, false), null)
.getLeft();
getTestTracingContext(fs, false), null, false)
.getOp();
assertCPKHeaders(abfsRestOperation, false);
assertNoCPKResponseHeadersPresent(abfsRestOperation);

Expand Down
Loading