Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -509,9 +509,30 @@ public AbfsRestOperation createPath(final String path,
final TracingContext tracingContext) throws AzureBlobFileSystemException {
AbfsRestOperation op;
if (isFileCreation) {
// Create a file with the specified parameters
op = createFile(path, overwrite, permissions, isAppendBlob, eTag,
contextEncryptionAdapter, tracingContext);
AbfsRestOperation statusOp = null;
try {
// Check if the file already exists by calling GetPathStatus
statusOp = getPathStatus(path, false, tracingContext, null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of override true, flow might come here with already a Head call done on path.
Can we avoid this head call in that case?

} catch (AbfsRestOperationException ex) {
// If the path does not exist, continue with file creation
// For other errors, rethrow the exception
if (ex.getStatusCode() != HTTP_NOT_FOUND) {
throw ex;
}
}

// If the file exists and overwrite is not allowed, throw conflict
if (statusOp != null && statusOp.hasResult() && !overwrite) {
throw new AbfsRestOperationException(
HTTP_CONFLICT,
AzureServiceErrorCode.PATH_CONFLICT.getErrorCode(),
PATH_EXISTS,
null);
} else {
// Proceed with file creation (force overwrite = true)
op = createFile(path, true, permissions, isAppendBlob, eTag,
contextEncryptionAdapter, tracingContext);
}
} else {
// Create a directory with the specified parameters
op = createDirectory(path, permissions, isAppendBlob, eTag,
Expand Down Expand Up @@ -584,7 +605,6 @@ public AbfsRestOperation createPathRestOp(final String path,
if (eTag != null && !eTag.isEmpty()) {
requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.IF_MATCH, eTag));
}

final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = getAbfsRestOperation(
AbfsRestOperationType.PutBlob,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@

package org.apache.hadoop.fs.azurebfs.services;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -54,7 +56,15 @@ public static void dumpHeadersToDebugLog(final String origin,
if (key == null) {
key = "HTTP Response";
}
String values = StringUtils.join(";", entry.getValue());
List<String> valuesList = entry.getValue();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Due to null pointer exceptions on enabling AbfsIoUtils logging if value is null.

if (valuesList == null) {
valuesList = Collections.emptyList();
} else {
valuesList = valuesList.stream()
.map(v -> v == null ? "" : v) // replace null with empty string
.collect(Collectors.toList());
}
String values = StringUtils.join(";", valuesList);
if (key.contains("Cookie")) {
values = "*cookie info*";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;

import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -96,6 +97,7 @@
import static org.apache.hadoop.fs.azurebfs.services.RenameAtomicity.SUFFIX;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertIsFile;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.assertj.core.api.Assumptions.assumeThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyString;
Expand Down Expand Up @@ -2236,6 +2238,98 @@ public void testFailureInGetPathStatusDuringCreateRecovery() throws Exception {
}
}

/**
* Test to simulate a successful create operation followed by a connection reset
* on the response, triggering a retry.
*
* This test verifies that the create operation is retried in the event of a
* connection reset during the response phase. The test creates a mock
* AzureBlobFileSystem and its associated components to simulate the create
* operation and the connection reset. It then verifies that the create
* operation is retried once before succeeding.
*
* @throws Exception if an error occurs during the test execution.
*/
@Test
public void testCreateIdempotencyForNonHnsBlob() throws Exception {
assumeThat(isAppendBlobEnabled()).as("Not valid for APPEND BLOB").isFalse();
// Create a spy of AzureBlobFileSystem
try (AzureBlobFileSystem fs = Mockito.spy(
(AzureBlobFileSystem) FileSystem.newInstance(getRawConfiguration()))) {
assumeHnsDisabled();
// Create a spy of AzureBlobFileSystemStore
AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
assumeBlobServiceType();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can move all assume before any other statement


// Create spies for the client handler and blob client
AbfsClientHandler clientHandler = Mockito.spy(store.getClientHandler());
AbfsBlobClient blobClient = Mockito.spy(clientHandler.getBlobClient());
fs.getAbfsStore().setClient(blobClient);
fs.getAbfsStore().setClientHandler(clientHandler);
// Set up the spies to return the mocked objects
Mockito.doReturn(clientHandler).when(store).getClientHandler();
Mockito.doReturn(blobClient).when(clientHandler).getBlobClient();
Mockito.doReturn(blobClient).when(clientHandler).getIngressClient();

AtomicInteger createCount = new AtomicInteger(0);

Mockito.doAnswer(answer -> {
// Set up the mock for the create operation
AbfsClientTestUtil.setMockAbfsRestOperationForCreateOperation(blobClient,
(httpOperation) -> {
Mockito.doAnswer(invocation -> {
// Call the real processResponse method
invocation.callRealMethod();

int currentCount = createCount.incrementAndGet();
if (currentCount == 2) {
Mockito.when(httpOperation.getStatusCode())
.thenReturn(
HTTP_INTERNAL_ERROR); // Status code 500 for Internal Server Error
Mockito.when(httpOperation.getStorageErrorMessage())
.thenReturn("CONNECTION_RESET"); // Error message
throw new IOException("Connection Reset");
}
return null;
}).when(httpOperation).processResponse(
Mockito.nullable(byte[].class),
Mockito.anyInt(),
Mockito.anyInt()
);

return httpOperation;
});
return answer.callRealMethod();
}).when(blobClient).createPath(
Mockito.anyString(),
Mockito.anyBoolean(),
Mockito.anyBoolean(),
Mockito.any(AzureBlobFileSystemStore.Permissions.class),
Mockito.anyBoolean(), Mockito.nullable(String.class), Mockito.any(ContextEncryptionAdapter.class),
any(TracingContext.class)
);

Path path = new Path("/test/file");
fs.create(path, false);
Mockito.verify(blobClient, Mockito.times(1)).createPath(
Mockito.anyString(),
Mockito.anyBoolean(),
Mockito.anyBoolean(),
Mockito.any(AzureBlobFileSystemStore.Permissions.class),
Mockito.anyBoolean(), Mockito.nullable(String.class), Mockito.any(ContextEncryptionAdapter.class),
any(TracingContext.class));

Mockito.verify(blobClient, Mockito.times(2)).createPathRestOp(
Mockito.anyString(),
Mockito.anyBoolean(),
Mockito.anyBoolean(),
Mockito.anyBoolean(),
Mockito.nullable(String.class), Mockito.any(ContextEncryptionAdapter.class),
any(TracingContext.class));
assertIsFile(fs, path);
}
}

/**
* Mocks and returns an instance of {@link AbfsDfsClient} for the given AzureBlobFileSystem.
* This method sets up the necessary mock behavior for the client handler and ingress client.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.hadoop.fs.azurebfs.security.ContextEncryptionAdapter;
import org.apache.hadoop.fs.azurebfs.services.AbfsBlobClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsClientHandler;
import org.apache.hadoop.fs.azurebfs.services.AbfsClientTestUtil;
import org.apache.hadoop.fs.azurebfs.services.AbfsDfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpHeader;
Expand All @@ -74,6 +75,7 @@
import static java.net.HttpURLConnection.HTTP_CLIENT_TIMEOUT;
import static java.net.HttpURLConnection.HTTP_CONFLICT;
import static java.net.HttpURLConnection.HTTP_FORBIDDEN;
import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR;
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
import static java.net.HttpURLConnection.HTTP_OK;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.RENAME_PATH_ATTEMPTS;
Expand Down Expand Up @@ -108,6 +110,7 @@
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.assertj.core.api.Assumptions.assumeThat;

/**
* Test rename operation.
Expand Down Expand Up @@ -1702,6 +1705,85 @@ public void testRenamePathRetryIdempotency() throws Exception {
}
}

/**
* Test to simulate a successful copy blob operation followed by a connection reset
* on the response, triggering a retry.
*
* This test verifies that the copy blob operation is retried in the event of a
* connection reset during the response phase. The test creates a mock
* AzureBlobFileSystem and its associated components to simulate the copy blob
* operation and the connection reset. It then verifies that the create
* operation is retried once before succeeding.
*
* @throws Exception if an error occurs during the test execution.
*/
@Test
public void testRenameIdempotencyForNonHnsBlob() throws Exception {
assumeThat(isAppendBlobEnabled()).as("Not valid for APPEND BLOB").isFalse();
// Create a spy of AzureBlobFileSystem
try (AzureBlobFileSystem fs = Mockito.spy(
(AzureBlobFileSystem) FileSystem.newInstance(getRawConfiguration()))) {
assumeHnsDisabled();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, move all assume to first few lines

// Create a spy of AzureBlobFileSystemStore
AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
assumeBlobServiceType();

// Create spies for the client handler and blob client
AbfsClientHandler clientHandler = Mockito.spy(store.getClientHandler());
AbfsBlobClient blobClient = Mockito.spy(clientHandler.getBlobClient());
fs.getAbfsStore().setClient(blobClient);
fs.getAbfsStore().setClientHandler(clientHandler);
// Set up the spies to return the mocked objects
Mockito.doReturn(clientHandler).when(store).getClientHandler();
Mockito.doReturn(blobClient).when(clientHandler).getBlobClient();
Mockito.doReturn(blobClient).when(clientHandler).getIngressClient();

AtomicInteger copyBlobCount = new AtomicInteger(0);
Path sourceDir = path("/testSrc");
assertMkdirs(fs, sourceDir);
String filename = "file1";
Path sourceFilePath = new Path(sourceDir, filename);
touch(sourceFilePath);
Path destFilePath = new Path(sourceDir, "file2");
Mockito.doAnswer(answer -> {
// Set up the mock for the create operation
AbfsClientTestUtil.setMockAbfsRestOperationForCopyBlobOperation(blobClient, sourceFilePath, destFilePath,
(httpOperation) -> {
Mockito.doAnswer(invocation -> {
// Call the real processResponse method
invocation.callRealMethod();

int currentCount = copyBlobCount.incrementAndGet();
if (currentCount == 1) {
Mockito.when(httpOperation.getStatusCode())
.thenReturn(
HTTP_INTERNAL_ERROR); // Status code 500 for Internal Server Error
Mockito.when(httpOperation.getStorageErrorMessage())
.thenReturn("CONNECTION_RESET"); // Error message
throw new IOException("Connection Reset");
}
return null;
}).when(httpOperation).processResponse(
Mockito.nullable(byte[].class),
Mockito.anyInt(),
Mockito.anyInt()
);

return httpOperation;
});
return answer.callRealMethod();
}).when(blobClient).copyBlob(
Mockito.any(Path.class),
Mockito.any(Path.class),
Mockito.nullable(String.class),
Mockito.any(TracingContext.class)
);
Assertions.assertThat(fs.rename(sourceFilePath, destFilePath))
.describedAs("Rename should succeed.")
.isTrue();
}
}

/**
* Test to verify that the client transaction ID is included in the response header
* after renaming a file in Azure Blob Storage.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,7 @@ public void testScenario8() throws Exception {
}
Assertions.assertThat(e.getMessage())
.as("Expected error message to contain 'AlreadyExists'")
.contains("AlreadyExists");
.containsIgnoringCase("Exists");
}

// Remove file
Expand Down
Loading