Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.List;
import java.util.stream.Collectors;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
Expand All @@ -30,9 +31,10 @@
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.Delete;
import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
import software.amazon.awssdk.services.s3.model.ListObjectVersionsRequest;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.ObjectVersion;
import software.amazon.awssdk.services.s3.paginators.ListObjectVersionsIterable;
import software.amazon.awssdk.services.s3control.S3ControlClient;
import software.amazon.awssdk.services.s3control.model.CreateAccessPointRequest;
import software.amazon.awssdk.services.s3control.model.DeleteAccessPointRequest;
Expand Down Expand Up @@ -94,28 +96,46 @@ public static String testAccountId() {
}

public static void cleanS3Bucket(S3Client s3, String bucketName, String prefix) {
boolean hasContent = true;
while (hasContent) {
ListObjectsV2Response response =
s3.listObjectsV2(
ListObjectsV2Request.builder().bucket(bucketName).prefix(prefix).build());
hasContent = response.hasContents();
if (hasContent) {
s3.deleteObjects(
DeleteObjectsRequest.builder()
.bucket(bucketName)
.delete(
Delete.builder()
.objects(
response.contents().stream()
.map(obj -> ObjectIdentifier.builder().key(obj.key()).build())
.collect(Collectors.toList()))
.build())
.build());
}
ListObjectVersionsIterable response =
s3.listObjectVersionsPaginator(
ListObjectVersionsRequest.builder().bucket(bucketName).prefix(prefix).build());
List<ObjectVersion> versionsToDelete = Lists.newArrayList();
int batchDeletionSize = 1000;
response.versions().stream()
.forEach(
version -> {
versionsToDelete.add(version);
if (versionsToDelete.size() == batchDeletionSize) {
deleteObjectVersions(s3, bucketName, versionsToDelete);
versionsToDelete.clear();
}
});

if (!versionsToDelete.isEmpty()) {
deleteObjectVersions(s3, bucketName, versionsToDelete);
}
}

private static void deleteObjectVersions(
S3Client s3, String bucket, List<ObjectVersion> objectVersions) {
s3.deleteObjects(
DeleteObjectsRequest.builder()
.bucket(bucket)
.delete(
Delete.builder()
.objects(
objectVersions.stream()
.map(
obj ->
ObjectIdentifier.builder()
.key(obj.key())
.versionId(obj.versionId())
.build())
.collect(Collectors.toList()))
.build())
.build());
}

public static void cleanGlueCatalog(GlueClient glue, List<String> namespaces) {
for (String namespace : namespaces) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,17 @@
import software.amazon.awssdk.services.kms.model.ListAliasesResponse;
import software.amazon.awssdk.services.kms.model.ScheduleKeyDeletionRequest;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.BucketVersioningStatus;
import software.amazon.awssdk.services.s3.model.GetObjectAclRequest;
import software.amazon.awssdk.services.s3.model.GetObjectAclResponse;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.ObjectCannedACL;
import software.amazon.awssdk.services.s3.model.Permission;
import software.amazon.awssdk.services.s3.model.PutBucketVersioningRequest;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.ServerSideEncryption;
import software.amazon.awssdk.services.s3.model.VersioningConfiguration;
import software.amazon.awssdk.services.s3control.S3ControlClient;
import software.amazon.awssdk.utils.ImmutableMap;
import software.amazon.awssdk.utils.IoUtils;
Expand Down Expand Up @@ -106,6 +109,12 @@ public static void beforeClass() {
AwsIntegTestUtil.createAccessPoint(s3Control, accessPointName, bucketName);
AwsIntegTestUtil.createAccessPoint(
crossRegionS3Control, crossRegionAccessPointName, crossRegionBucketName);
s3.putBucketVersioning(
PutBucketVersioningRequest.builder()
.bucket(bucketName)
.versioningConfiguration(
VersioningConfiguration.builder().status(BucketVersioningStatus.ENABLED).build())
Comment on lines +112 to +116
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The simplest way for integration tests seemed to be just to enable bucket versioning for the bucket and then on cleanup, ensure deletion of every object version to make sure no garbage is left over.

.build());
}

@AfterAll
Expand Down Expand Up @@ -445,6 +454,35 @@ public void testPrefixDelete() {
});
}

@Test
public void testFileRecoveryHappyPath() throws Exception {
S3FileIO s3FileIO = new S3FileIO(clientFactory::s3, new S3FileIOProperties());
String filePath = String.format("s3://%s/%s/%s", bucketName, prefix, "someFile.parquet");
write(s3FileIO, filePath);
s3FileIO.deleteFile(filePath);
assertThat(s3FileIO.newInputFile(filePath).exists()).isFalse();

assertThat(s3FileIO.recoverFile(filePath)).isTrue();
assertThat(s3FileIO.newInputFile(filePath).exists()).isTrue();
}

@Test
public void testFileRecoveryFailsToRecover() throws Exception {
S3FileIO s3FileIO = new S3FileIO(clientFactory::s3, new S3FileIOProperties());
s3.putBucketVersioning(
PutBucketVersioningRequest.builder()
.bucket(bucketName)
.versioningConfiguration(
VersioningConfiguration.builder().status(BucketVersioningStatus.SUSPENDED).build())
.build());
String filePath = String.format("s3://%s/%s/%s", bucketName, prefix, "unversionedFile.parquet");
write(s3FileIO, filePath);
s3FileIO.deleteFile(filePath);
assertThat(s3FileIO.newInputFile(filePath).exists()).isFalse();

assertThat(s3FileIO.recoverFile(filePath)).isFalse();
}

private S3FileIOProperties getDeletionTestProperties() {
S3FileIOProperties properties = new S3FileIOProperties();
properties.setDeleteBatchSize(deletionBatchSize);
Expand Down
50 changes: 49 additions & 1 deletion aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@

import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand All @@ -37,6 +39,7 @@
import org.apache.iceberg.io.FileInfo;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.io.SupportsRecoveryOperations;
import org.apache.iceberg.metrics.MetricsContext;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
Expand All @@ -52,6 +55,7 @@
import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.Delete;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
Expand All @@ -61,10 +65,12 @@
import software.amazon.awssdk.services.s3.model.GetObjectTaggingResponse;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.ObjectVersion;
import software.amazon.awssdk.services.s3.model.PutObjectTaggingRequest;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.services.s3.model.Tag;
import software.amazon.awssdk.services.s3.model.Tagging;
import software.amazon.awssdk.services.s3.paginators.ListObjectVersionsIterable;

/**
* FileIO implementation backed by S3.
Expand All @@ -73,7 +79,7 @@
* schemes s3a, s3n, https are also treated as s3 file paths. Using this FileIO with other schemes
* will result in {@link org.apache.iceberg.exceptions.ValidationException}.
*/
public class S3FileIO implements CredentialSupplier, DelegateFileIO {
public class S3FileIO implements CredentialSupplier, DelegateFileIO, SupportsRecoveryOperations {
private static final Logger LOG = LoggerFactory.getLogger(S3FileIO.class);
private static final String DEFAULT_METRICS_IMPL =
"org.apache.iceberg.hadoop.HadoopMetricsContext";
Expand Down Expand Up @@ -420,4 +426,46 @@ protected void finalize() throws Throwable {
}
}
}

@Override
public boolean recoverFile(String path) {
S3URI location = new S3URI(path, s3FileIOProperties.bucketToAccessPointMapping());
ListObjectVersionsIterable response =
client()
.listObjectVersionsPaginator(
builder -> builder.bucket(location.bucket()).prefix(location.key()));

// Recover to the last modified version, not isLatest,
// since isLatest is true for deletion markers.
Optional<ObjectVersion> recoverVersion =
response.versions().stream().max(Comparator.comparing(ObjectVersion::lastModified));

return recoverVersion.map(version -> recoverObject(version, location.bucket())).orElse(false);
}

private boolean recoverObject(ObjectVersion version, String bucket) {
if (version.isLatest()) {
return true;
}
Comment on lines +447 to +449
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[optional] we can have an integ test trying to restore an existing object doesn't leads to creating a new version of obj

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missed this, sure I can do a follow on to verify that we skip the recovery if the latest version is not a deletion marker.


LOG.info("Attempting to recover object {}", version.key());
try {
// Perform a copy instead of deleting the delete marker
// so that recovery does not rely on delete permissions
client()
.copyObject(
builder ->
builder
.sourceBucket(bucket)
.sourceKey(version.key())
.sourceVersionId(version.versionId())
.destinationBucket(bucket)
.destinationKey(version.key()));
} catch (SdkException e) {
LOG.warn("Failed to recover object {}", version.key(), e);
return false;
}

return true;
}
}