diff --git a/aws/src/integration/java/org/apache/iceberg/aws/AwsIntegTestUtil.java b/aws/src/integration/java/org/apache/iceberg/aws/AwsIntegTestUtil.java index bbe062d5db48..7e0ca6ed10b2 100644 --- a/aws/src/integration/java/org/apache/iceberg/aws/AwsIntegTestUtil.java +++ b/aws/src/integration/java/org/apache/iceberg/aws/AwsIntegTestUtil.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.stream.Collectors; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient; @@ -30,9 +31,10 @@ import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.Delete; import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest; -import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; -import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; +import software.amazon.awssdk.services.s3.model.ListObjectVersionsRequest; import software.amazon.awssdk.services.s3.model.ObjectIdentifier; +import software.amazon.awssdk.services.s3.model.ObjectVersion; +import software.amazon.awssdk.services.s3.paginators.ListObjectVersionsIterable; import software.amazon.awssdk.services.s3control.S3ControlClient; import software.amazon.awssdk.services.s3control.model.CreateAccessPointRequest; import software.amazon.awssdk.services.s3control.model.DeleteAccessPointRequest; @@ -94,28 +96,46 @@ public static String testAccountId() { } public static void cleanS3Bucket(S3Client s3, String bucketName, String prefix) { - boolean hasContent = true; - while (hasContent) { - ListObjectsV2Response response = - s3.listObjectsV2( - ListObjectsV2Request.builder().bucket(bucketName).prefix(prefix).build()); - hasContent = response.hasContents(); - if (hasContent) { - s3.deleteObjects( - DeleteObjectsRequest.builder() - .bucket(bucketName) - .delete( - Delete.builder() - .objects( - response.contents().stream() - .map(obj -> ObjectIdentifier.builder().key(obj.key()).build()) - .collect(Collectors.toList())) - .build()) - .build()); - } + ListObjectVersionsIterable response = + s3.listObjectVersionsPaginator( + ListObjectVersionsRequest.builder().bucket(bucketName).prefix(prefix).build()); + List versionsToDelete = Lists.newArrayList(); + int batchDeletionSize = 1000; + response.versions().stream() + .forEach( + version -> { + versionsToDelete.add(version); + if (versionsToDelete.size() == batchDeletionSize) { + deleteObjectVersions(s3, bucketName, versionsToDelete); + versionsToDelete.clear(); + } + }); + + if (!versionsToDelete.isEmpty()) { + deleteObjectVersions(s3, bucketName, versionsToDelete); } } + private static void deleteObjectVersions( + S3Client s3, String bucket, List objectVersions) { + s3.deleteObjects( + DeleteObjectsRequest.builder() + .bucket(bucket) + .delete( + Delete.builder() + .objects( + objectVersions.stream() + .map( + obj -> + ObjectIdentifier.builder() + .key(obj.key()) + .versionId(obj.versionId()) + .build()) + .collect(Collectors.toList())) + .build()) + .build()); + } + public static void cleanGlueCatalog(GlueClient glue, List namespaces) { for (String namespace : namespaces) { try { diff --git a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIOIntegration.java b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIOIntegration.java index 18abb82ce74a..cacf04891896 100644 --- a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIOIntegration.java +++ b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIOIntegration.java @@ -53,14 +53,17 @@ import software.amazon.awssdk.services.kms.model.ListAliasesResponse; import software.amazon.awssdk.services.kms.model.ScheduleKeyDeletionRequest; import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.BucketVersioningStatus; import software.amazon.awssdk.services.s3.model.GetObjectAclRequest; import software.amazon.awssdk.services.s3.model.GetObjectAclResponse; import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.GetObjectResponse; import software.amazon.awssdk.services.s3.model.ObjectCannedACL; import software.amazon.awssdk.services.s3.model.Permission; +import software.amazon.awssdk.services.s3.model.PutBucketVersioningRequest; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.model.ServerSideEncryption; +import software.amazon.awssdk.services.s3.model.VersioningConfiguration; import software.amazon.awssdk.services.s3control.S3ControlClient; import software.amazon.awssdk.utils.ImmutableMap; import software.amazon.awssdk.utils.IoUtils; @@ -106,6 +109,12 @@ public static void beforeClass() { AwsIntegTestUtil.createAccessPoint(s3Control, accessPointName, bucketName); AwsIntegTestUtil.createAccessPoint( crossRegionS3Control, crossRegionAccessPointName, crossRegionBucketName); + s3.putBucketVersioning( + PutBucketVersioningRequest.builder() + .bucket(bucketName) + .versioningConfiguration( + VersioningConfiguration.builder().status(BucketVersioningStatus.ENABLED).build()) + .build()); } @AfterAll @@ -445,6 +454,35 @@ public void testPrefixDelete() { }); } + @Test + public void testFileRecoveryHappyPath() throws Exception { + S3FileIO s3FileIO = new S3FileIO(clientFactory::s3, new S3FileIOProperties()); + String filePath = String.format("s3://%s/%s/%s", bucketName, prefix, "someFile.parquet"); + write(s3FileIO, filePath); + s3FileIO.deleteFile(filePath); + assertThat(s3FileIO.newInputFile(filePath).exists()).isFalse(); + + assertThat(s3FileIO.recoverFile(filePath)).isTrue(); + assertThat(s3FileIO.newInputFile(filePath).exists()).isTrue(); + } + + @Test + public void testFileRecoveryFailsToRecover() throws Exception { + S3FileIO s3FileIO = new S3FileIO(clientFactory::s3, new S3FileIOProperties()); + s3.putBucketVersioning( + PutBucketVersioningRequest.builder() + .bucket(bucketName) + .versioningConfiguration( + VersioningConfiguration.builder().status(BucketVersioningStatus.SUSPENDED).build()) + .build()); + String filePath = String.format("s3://%s/%s/%s", bucketName, prefix, "unversionedFile.parquet"); + write(s3FileIO, filePath); + s3FileIO.deleteFile(filePath); + assertThat(s3FileIO.newInputFile(filePath).exists()).isFalse(); + + assertThat(s3FileIO.recoverFile(filePath)).isFalse(); + } + private S3FileIOProperties getDeletionTestProperties() { S3FileIOProperties properties = new S3FileIOProperties(); properties.setDeleteBatchSize(deletionBatchSize); diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java index dd13e13f01a6..f7d2da5eb907 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java @@ -20,8 +20,10 @@ import java.util.Arrays; import java.util.Collection; +import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -37,6 +39,7 @@ import org.apache.iceberg.io.FileInfo; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.SupportsRecoveryOperations; import org.apache.iceberg.metrics.MetricsContext; import org.apache.iceberg.relocated.com.google.common.base.Joiner; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -52,6 +55,7 @@ import org.apache.iceberg.util.ThreadPools; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.Delete; import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; @@ -61,10 +65,12 @@ import software.amazon.awssdk.services.s3.model.GetObjectTaggingResponse; import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; import software.amazon.awssdk.services.s3.model.ObjectIdentifier; +import software.amazon.awssdk.services.s3.model.ObjectVersion; import software.amazon.awssdk.services.s3.model.PutObjectTaggingRequest; import software.amazon.awssdk.services.s3.model.S3Exception; import software.amazon.awssdk.services.s3.model.Tag; import software.amazon.awssdk.services.s3.model.Tagging; +import software.amazon.awssdk.services.s3.paginators.ListObjectVersionsIterable; /** * FileIO implementation backed by S3. @@ -73,7 +79,7 @@ * schemes s3a, s3n, https are also treated as s3 file paths. Using this FileIO with other schemes * will result in {@link org.apache.iceberg.exceptions.ValidationException}. */ -public class S3FileIO implements CredentialSupplier, DelegateFileIO { +public class S3FileIO implements CredentialSupplier, DelegateFileIO, SupportsRecoveryOperations { private static final Logger LOG = LoggerFactory.getLogger(S3FileIO.class); private static final String DEFAULT_METRICS_IMPL = "org.apache.iceberg.hadoop.HadoopMetricsContext"; @@ -420,4 +426,46 @@ protected void finalize() throws Throwable { } } } + + @Override + public boolean recoverFile(String path) { + S3URI location = new S3URI(path, s3FileIOProperties.bucketToAccessPointMapping()); + ListObjectVersionsIterable response = + client() + .listObjectVersionsPaginator( + builder -> builder.bucket(location.bucket()).prefix(location.key())); + + // Recover to the last modified version, not isLatest, + // since isLatest is true for deletion markers. + Optional recoverVersion = + response.versions().stream().max(Comparator.comparing(ObjectVersion::lastModified)); + + return recoverVersion.map(version -> recoverObject(version, location.bucket())).orElse(false); + } + + private boolean recoverObject(ObjectVersion version, String bucket) { + if (version.isLatest()) { + return true; + } + + LOG.info("Attempting to recover object {}", version.key()); + try { + // Perform a copy instead of deleting the delete marker + // so that recovery does not rely on delete permissions + client() + .copyObject( + builder -> + builder + .sourceBucket(bucket) + .sourceKey(version.key()) + .sourceVersionId(version.versionId()) + .destinationBucket(bucket) + .destinationKey(version.key())); + } catch (SdkException e) { + LOG.warn("Failed to recover object {}", version.key(), e); + return false; + } + + return true; + } }