From 288152f0a45c272e0f895db787b5e246d9a6b2d3 Mon Sep 17 00:00:00 2001 From: Marc Cenac Date: Sat, 9 Nov 2024 23:30:10 -0600 Subject: [PATCH] GCP: Implement SupportsRecoveryOperations for GCSFileIO --- .../org/apache/iceberg/gcp/gcs/GCSFileIO.java | 122 ++++++++++++++- .../apache/iceberg/gcp/gcs/GCSFileIOTest.java | 145 ++++++++++++++++++ 2 files changed, 266 insertions(+), 1 deletion(-) diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java index 5737606aef5e..f90376031b29 100644 --- a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java +++ b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java @@ -24,9 +24,14 @@ import com.google.cloud.NoCredentials; import com.google.cloud.storage.Blob; import com.google.cloud.storage.BlobId; +import com.google.cloud.storage.BucketInfo; import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageException; import com.google.cloud.storage.StorageOptions; +import java.time.Duration; +import java.util.Comparator; import java.util.Map; +import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Stream; import org.apache.iceberg.common.DynConstructors; @@ -36,7 +41,10 @@ import org.apache.iceberg.io.FileInfo; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.SupportsRecoveryOperations; import org.apache.iceberg.metrics.MetricsContext; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.base.Strings; import org.apache.iceberg.relocated.com.google.common.collect.Iterators; import org.apache.iceberg.relocated.com.google.common.collect.Streams; import org.apache.iceberg.util.SerializableMap; @@ -56,7 +64,7 @@ *

See Cloud Storage * Overview */ -public class GCSFileIO implements DelegateFileIO { +public class GCSFileIO implements DelegateFileIO, SupportsRecoveryOperations { private static final Logger LOG = LoggerFactory.getLogger(GCSFileIO.class); private static final String DEFAULT_METRICS_IMPL = "org.apache.iceberg.hadoop.HadoopMetricsContext"; @@ -242,4 +250,116 @@ private void internalDeleteFiles(Stream blobIdsToDelete) { Streams.stream(Iterators.partition(blobIdsToDelete.iterator(), gcpProperties.deleteBatchSize())) .forEach(batch -> client().delete(batch)); } + + @Override + public boolean recoverFile(String path) { + Preconditions.checkArgument( + !Strings.isNullOrEmpty(path), "Cannot recover file: path must not be null or empty"); + + try { + BlobId blobId = BlobId.fromGsUtilUri(path); + + // first attempt to restore with soft-delete + if (recoverSoftDeletedObject(blobId)) { + return true; + } + + // fallback to restoring by copying the latest version + if (recoverLatestVersion(blobId)) { + return true; + } + + } catch (IllegalArgumentException e) { + LOG.warn("Invalid GCS path format: {}", path, e); + } + + return false; + } + + /** + * Attempts to restore a soft-deleted object. + * + *

Requires {@code storage.objects.restore} permission + * + *

See docs + * + * @param blobId the blob identifier + * @return {@code true} if blob was recovered, {@code false} if not + */ + protected boolean recoverSoftDeletedObject(BlobId blobId) { + try { + BucketInfo.SoftDeletePolicy policy = client().get(blobId.getBucket()).getSoftDeletePolicy(); + if (Duration.ofSeconds(0).equals(policy.getRetentionDuration())) { + LOG.warn("Soft delete is disabled for {}", blobId.getBucket()); + return false; + } + + Optional latestSoftDeletedBlob = + client() + .list( + blobId.getBucket(), + Storage.BlobListOption.prefix(blobId.getName()), + Storage.BlobListOption.softDeleted(true)) + .streamAll() + .filter(blob -> blob.getName().equals(blobId.getName())) + .max(Comparator.comparing(Blob::getSoftDeleteTime)); + + if (latestSoftDeletedBlob.isPresent()) { + client().restore(latestSoftDeletedBlob.get().getBlobId()); + LOG.info("Soft delete object restored file {}", blobId); + return true; + } + LOG.warn("No soft deleted object was found"); + + } catch (StorageException e) { + LOG.warn("Failed to restore", e); + } + + return false; + } + + /** + * Attempts to restore the latest deleted object version. + * + *

See docs + * + * @param blobId the blob identifier + * @return {@code true} if blob was recovered, {@code false} if not + */ + protected boolean recoverLatestVersion(BlobId blobId) { + try { + if (!client().get(blobId.getBucket()).versioningEnabled()) { + LOG.warn("Object versioning is disabled for {}", blobId.getBucket()); + return false; + } + + Optional latestVersion = + client() + .list( + blobId.getBucket(), + Storage.BlobListOption.prefix(blobId.getName()), + Storage.BlobListOption.versions(true)) + .streamAll() + .filter(blob -> blob.getName().equals(blobId.getName())) + .max(Comparator.comparing(Blob::getUpdateTimeOffsetDateTime)); + + if (latestVersion.isPresent() && latestVersion.get().getDeleteTimeOffsetDateTime() != null) { + Storage.CopyRequest copyRequest = + Storage.CopyRequest.newBuilder() + .setSource(latestVersion.get().getBlobId()) + .setTarget(blobId) + .build(); + Blob blob = client().copy(copyRequest).getResult(); + LOG.info("Latest deleted version was restored for {}", blob.getBlobId()); + return true; + } + LOG.warn("No latest deleted version was found"); + + } catch (StorageException e) { + LOG.warn("Failed to restore latest deleted version", e); + } + + return false; + } } diff --git a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java index 6302f664b70a..a7f1d8fc4fb2 100644 --- a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java +++ b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java @@ -24,24 +24,37 @@ import static org.apache.iceberg.gcp.GCPProperties.GCS_OAUTH2_TOKEN; import static org.apache.iceberg.gcp.GCPProperties.GCS_OAUTH2_TOKEN_EXPIRES_AT; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; +import com.google.api.gax.paging.Page; import com.google.auth.oauth2.OAuth2Credentials; import com.google.auth.oauth2.OAuth2CredentialsWithRefresh; +import com.google.cloud.storage.Blob; import com.google.cloud.storage.BlobId; import com.google.cloud.storage.BlobInfo; +import com.google.cloud.storage.Bucket; +import com.google.cloud.storage.BucketInfo; +import com.google.cloud.storage.CopyWriter; import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageException; import com.google.cloud.storage.contrib.nio.testing.LocalStorageHelper; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; +import java.time.Duration; import java.time.Instant; +import java.time.OffsetDateTime; import java.time.temporal.ChronoUnit; import java.util.List; import java.util.Random; +import java.util.stream.Stream; import java.util.stream.StreamSupport; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.TestHelpers; @@ -270,4 +283,136 @@ public void refreshCredentialsEndpointSetButRefreshDisabled() { assertThat(client.getOptions().getCredentials()).isInstanceOf(OAuth2Credentials.class); } + + @Test + void recoverSoftDeletedObject_WhenSoftDeleteDisabled_ReturnsFalse() { + BlobId blobId = BlobId.of(TEST_BUCKET, "object"); + Bucket bucket = mock(Bucket.class); + BucketInfo.SoftDeletePolicy policy = mock(BucketInfo.SoftDeletePolicy.class); + + when(storage.get(TEST_BUCKET)).thenReturn(bucket); + when(bucket.getSoftDeletePolicy()).thenReturn(policy); + when(policy.getRetentionDuration()).thenReturn(Duration.ofSeconds(0)); // soft-delete disabled + + assertThat(io.recoverSoftDeletedObject(blobId)).isFalse(); + } + + @Test + void recoverSoftDeletedObject_WhenSoftDeletedBlobExists_ReturnsTrue() { + BlobId blobId = BlobId.of(TEST_BUCKET, "object"); + Bucket bucket = mock(Bucket.class); + BucketInfo.SoftDeletePolicy policy = mock(BucketInfo.SoftDeletePolicy.class); + Page page = mock(Page.class); + Blob softDeletedBlob = mock(Blob.class); + + when(storage.get(TEST_BUCKET)).thenReturn(bucket); + when(bucket.getSoftDeletePolicy()).thenReturn(policy); + when(policy.getRetentionDuration()).thenReturn(Duration.ofDays(7)); // soft-delete enabled + doAnswer(invoke -> page).when(storage).list(eq(TEST_BUCKET), any(), any()); + when(page.streamAll()).thenReturn(Stream.of(softDeletedBlob)); + when(softDeletedBlob.getName()).thenReturn("object"); + when(softDeletedBlob.getSoftDeleteTime()).thenReturn(OffsetDateTime.now()); + when(softDeletedBlob.getBlobId()).thenReturn(blobId); + doAnswer(invocation -> softDeletedBlob).when(storage).restore(any(), any()); + + assertThat(io.recoverSoftDeletedObject(blobId)).isTrue(); + } + + @Test + void recoverSoftDeletedObject_WhenNoSoftDeletedBlobExists_ReturnsFalse() { + BlobId blobId = BlobId.of(TEST_BUCKET, "object"); + Bucket bucket = mock(Bucket.class); + BucketInfo.SoftDeletePolicy policy = mock(BucketInfo.SoftDeletePolicy.class); + Page page = mock(Page.class); + + when(storage.get(TEST_BUCKET)).thenReturn(bucket); + when(bucket.getSoftDeletePolicy()).thenReturn(policy); + when(policy.getRetentionDuration()).thenReturn(Duration.ofDays(7)); // soft-delete enabled + when(page.streamAll()).thenReturn(Stream.empty()); + + assertThat(io.recoverSoftDeletedObject(blobId)).isFalse(); + } + + @Test + void recoverSoftDelete_WhenStorageException_ReturnsFalse() { + BlobId blobId = BlobId.of(TEST_BUCKET, "object"); + Bucket bucket = mock(Bucket.class); + BucketInfo.SoftDeletePolicy policy = mock(BucketInfo.SoftDeletePolicy.class); + + when(storage.get(TEST_BUCKET)).thenReturn(bucket); + when(bucket.getSoftDeletePolicy()).thenReturn(policy); + when(policy.getRetentionDuration()).thenReturn(Duration.ofDays(7)); // soft-delete enabled + doThrow(StorageException.class).when(storage).list(eq(TEST_BUCKET), any(), any()); + + assertThat(io.recoverLatestVersion(blobId)).isFalse(); + } + + @Test + void recoverLatestVersion_WhenVersioningDisabled_ReturnsFalse() { + BlobId blobId = BlobId.of(TEST_BUCKET, "object"); + Bucket bucket = mock(Bucket.class); + + when(storage.get(TEST_BUCKET)).thenReturn(bucket); + when(bucket.versioningEnabled()).thenReturn(false); + + assertThat(io.recoverLatestVersion(blobId)).isFalse(); + } + + @Test + void recoverLatestVersion_WhenVersionExists_ReturnsTrue() { + BlobId blobId = BlobId.of(TEST_BUCKET, "object"); + Bucket bucket = mock(Bucket.class); + Blob versionedBlob = mock(Blob.class); + CopyWriter copyWriter = mock(CopyWriter.class); + Page page = mock(Page.class); + + when(storage.get(TEST_BUCKET)).thenReturn(bucket); + when(bucket.versioningEnabled()).thenReturn(true); + when(versionedBlob.getName()).thenReturn("object"); + when(versionedBlob.getUpdateTimeOffsetDateTime()).thenReturn(OffsetDateTime.now()); + when(versionedBlob.getDeleteTimeOffsetDateTime()).thenReturn(OffsetDateTime.now()); + when(versionedBlob.getBlobId()).thenReturn(blobId); + doAnswer(invoke -> page).when(storage).list(eq(TEST_BUCKET), any(), any()); + when(page.streamAll()).thenReturn(Stream.of(versionedBlob)); + // mock successful copy operation + doAnswer(invocation -> copyWriter).when(storage).copy(any()); + when(storage.copy(any())).thenReturn(copyWriter); + when(copyWriter.getResult()).thenReturn(versionedBlob); + + assertThat(io.recoverLatestVersion(blobId)).isTrue(); + } + + @Test + void recoverLatestVersion_WhenNoVersionExists_ReturnsFalse() { + BlobId blobId = BlobId.of(TEST_BUCKET, "object"); + Bucket bucket = mock(Bucket.class); + Page page = mock(Page.class); + + when(storage.get(TEST_BUCKET)).thenReturn(bucket); + when(bucket.versioningEnabled()).thenReturn(true); + doAnswer(invoke -> page).when(storage).list(eq(TEST_BUCKET), any(), any()); + when(page.streamAll()).thenReturn(Stream.empty()); + + assertThat(io.recoverLatestVersion(blobId)).isFalse(); + } + + @Test + void recoverLatestVersion_WhenCopyFails_ReturnsFalse() { + BlobId blobId = BlobId.of(TEST_BUCKET, "object"); + Bucket bucket = mock(Bucket.class); + Blob versionedBlob = mock(Blob.class); + Page page = mock(Page.class); + + when(storage.get(TEST_BUCKET)).thenReturn(bucket); + when(bucket.versioningEnabled()).thenReturn(true); + when(versionedBlob.getName()).thenReturn("object"); + when(versionedBlob.getUpdateTimeOffsetDateTime()).thenReturn(OffsetDateTime.now()); + when(versionedBlob.getBlobId()).thenReturn(blobId); + doAnswer(invoke -> page).when(storage).list(eq(TEST_BUCKET), any(), any()); + when(page.streamAll()).thenReturn(Stream.of(versionedBlob)); + // mock failed copy operation + doThrow(StorageException.class).when(storage).copy(any()); + + assertThat(io.recoverLatestVersion(blobId)).isFalse(); + } }