diff --git a/hadoop-ozone/integration-test-s3/pom.xml b/hadoop-ozone/integration-test-s3/pom.xml index 08251ff35d04..48a0faa1e076 100644 --- a/hadoop-ozone/integration-test-s3/pom.xml +++ b/hadoop-ozone/integration-test-s3/pom.xml @@ -112,6 +112,11 @@ s3 test + + software.amazon.awssdk + s3-transfer-manager + test + software.amazon.awssdk sdk-core diff --git a/hadoop-ozone/integration-test-s3/src/test/java/org/apache/hadoop/ozone/s3/S3ClientFactory.java b/hadoop-ozone/integration-test-s3/src/test/java/org/apache/hadoop/ozone/s3/S3ClientFactory.java index 5f80a0907c5f..78db8f8ac3c6 100644 --- a/hadoop-ozone/integration-test-s3/src/test/java/org/apache/hadoop/ozone/s3/S3ClientFactory.java +++ b/hadoop-ozone/integration-test-s3/src/test/java/org/apache/hadoop/ozone/s3/S3ClientFactory.java @@ -39,7 +39,11 @@ import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.S3AsyncClientBuilder; +import software.amazon.awssdk.services.s3.S3BaseClientBuilder; import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.S3ClientBuilder; /** * Factory class for creating S3 clients. @@ -127,6 +131,23 @@ public S3Client createS3ClientV2() throws Exception { * @throws Exception if there is an error creating the client */ public S3Client createS3ClientV2(boolean enablePathStyle) throws Exception { + S3ClientBuilder builder = S3Client.builder(); + configureCommon(builder, enablePathStyle); + return builder.build(); + } + + public S3AsyncClient createS3AsyncClientV2() throws Exception { + return createS3AsyncClientV2(true); + } + + public S3AsyncClient createS3AsyncClientV2(boolean enablePathStyle) throws Exception { + S3AsyncClientBuilder builder = S3AsyncClient.builder(); + configureCommon(builder, enablePathStyle); + return builder.build(); + } + + private > void configureCommon(T builder, boolean enablePathStyle) + throws Exception { final String accessKey = "user"; final String secretKey = "password"; final Region region = Region.US_EAST_1; @@ -151,11 +172,9 @@ public S3Client createS3ClientV2(boolean enablePathStyle) throws Exception { AwsBasicCredentials credentials = AwsBasicCredentials.create(accessKey, secretKey); - return S3Client.builder() - .region(region) + builder.region(region) .endpointOverride(new URI(endpoint)) .credentialsProvider(StaticCredentialsProvider.create(credentials)) - .forcePathStyle(enablePathStyle) - .build(); + .forcePathStyle(enablePathStyle); } } diff --git a/hadoop-ozone/integration-test-s3/src/test/java/org/apache/hadoop/ozone/s3/awssdk/v2/AbstractS3SDKV2Tests.java b/hadoop-ozone/integration-test-s3/src/test/java/org/apache/hadoop/ozone/s3/awssdk/v2/AbstractS3SDKV2Tests.java index 221839e9c7b9..6087787e2dfa 100644 --- a/hadoop-ozone/integration-test-s3/src/test/java/org/apache/hadoop/ozone/s3/awssdk/v2/AbstractS3SDKV2Tests.java +++ b/hadoop-ozone/integration-test-s3/src/test/java/org/apache/hadoop/ozone/s3/awssdk/v2/AbstractS3SDKV2Tests.java @@ -63,6 +63,7 @@ import org.junit.jupiter.api.io.TempDir; import software.amazon.awssdk.core.ResponseBytes; import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse; import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload; @@ -83,6 +84,10 @@ import software.amazon.awssdk.services.s3.model.Tagging; import software.amazon.awssdk.services.s3.model.UploadPartRequest; import software.amazon.awssdk.services.s3.model.UploadPartResponse; +import software.amazon.awssdk.transfer.s3.S3TransferManager; +import software.amazon.awssdk.transfer.s3.model.DownloadFileRequest; +import software.amazon.awssdk.transfer.s3.model.FileDownload; +import software.amazon.awssdk.transfer.s3.model.ResumableFileDownload; /** * This is an abstract class to test the AWS Java S3 SDK operations. @@ -103,6 +108,7 @@ public abstract class AbstractS3SDKV2Tests extends OzoneTestBase { private static MiniOzoneCluster cluster = null; private static S3Client s3Client = null; + private static S3AsyncClient s3AsyncClient = null; /** * Create a MiniOzoneCluster with S3G enabled for testing. @@ -116,7 +122,10 @@ static void startCluster(OzoneConfiguration conf) throws Exception { .setNumDatanodes(5) .build(); cluster.waitForClusterToBeReady(); - s3Client = new S3ClientFactory(s3g.getConf()).createS3ClientV2(); + + S3ClientFactory s3Factory = new S3ClientFactory(s3g.getConf()); + s3Client = s3Factory.createS3ClientV2(); + s3AsyncClient = s3Factory.createS3AsyncClientV2(); } /** @@ -340,6 +349,46 @@ public void testLowLevelMultipartUpload(@TempDir Path tempDir) throws Exception assertEquals(userMetadata, headObjectResponse.metadata()); } + @Test + public void testResumableDownloadWithEtagMismatch() throws Exception { + // Arrange + final String bucketName = getBucketName("resumable"); + final String keyName = getKeyName("resumable"); + final String fileContent = "This is a test file for resumable download."; + s3Client.createBucket(b -> b.bucket(bucketName)); + s3Client.putObject(b -> b.bucket(bucketName).key(keyName), RequestBody.fromString(fileContent)); + + // Prepare a temp file for download + Path downloadPath = Files.createTempFile("downloaded", ".txt"); + + // Set up S3TransferManager + try (S3TransferManager transferManager = + S3TransferManager.builder().s3Client(s3AsyncClient).build()) { + + // First download + DownloadFileRequest downloadRequest = DownloadFileRequest.builder() + .getObjectRequest(b -> b.bucket(bucketName).key(keyName)) + .destination(downloadPath) + .build(); + FileDownload download = transferManager.downloadFile(downloadRequest); + ResumableFileDownload resumableFileDownload = download.pause(); + + // Simulate etag mismatch by modifying the file in S3 + final String newContent = "This is new content to cause etag mismatch."; + s3Client.putObject(b -> b.bucket(bucketName).key(keyName), RequestBody.fromString(newContent)); + + // Resume download + FileDownload resumedDownload = transferManager.resumeDownloadFile(resumableFileDownload); + resumedDownload.completionFuture().get(); + + String downloadedContent = new String(Files.readAllBytes(downloadPath), StandardCharsets.UTF_8); + assertEquals(newContent, downloadedContent); + + File downloadFile = downloadPath.toFile(); + assertTrue(downloadFile.delete()); + } + } + private String getBucketName() { return getBucketName(""); } diff --git a/pom.xml b/pom.xml index 81d7bac77ec0..18f33fec3bf2 100644 --- a/pom.xml +++ b/pom.xml @@ -36,7 +36,7 @@ 1.9.7 3.27.3 1.12.661 - 2.31.25 + 2.31.40 0.8.0.RELEASE 1.80 3.6.0