Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 121 additions & 1 deletion gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,14 @@
import com.google.cloud.NoCredentials;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BucketInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageException;
import com.google.cloud.storage.StorageOptions;
import java.time.Duration;
import java.util.Comparator;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;
import org.apache.iceberg.common.DynConstructors;
Expand All @@ -36,7 +41,10 @@
import org.apache.iceberg.io.FileInfo;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.io.SupportsRecoveryOperations;
import org.apache.iceberg.metrics.MetricsContext;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Strings;
import org.apache.iceberg.relocated.com.google.common.collect.Iterators;
import org.apache.iceberg.relocated.com.google.common.collect.Streams;
import org.apache.iceberg.util.SerializableMap;
Expand All @@ -56,7 +64,7 @@
* <p>See <a href="https://cloud.google.com/storage/docs/folders#overview">Cloud Storage
* Overview</a>
*/
public class GCSFileIO implements DelegateFileIO {
public class GCSFileIO implements DelegateFileIO, SupportsRecoveryOperations {
private static final Logger LOG = LoggerFactory.getLogger(GCSFileIO.class);
private static final String DEFAULT_METRICS_IMPL =
"org.apache.iceberg.hadoop.HadoopMetricsContext";
Expand Down Expand Up @@ -242,4 +250,116 @@ private void internalDeleteFiles(Stream<BlobId> blobIdsToDelete) {
Streams.stream(Iterators.partition(blobIdsToDelete.iterator(), gcpProperties.deleteBatchSize()))
.forEach(batch -> client().delete(batch));
}

@Override
public boolean recoverFile(String path) {
Preconditions.checkArgument(
!Strings.isNullOrEmpty(path), "Cannot recover file: path must not be null or empty");

try {
BlobId blobId = BlobId.fromGsUtilUri(path);

// first attempt to restore with soft-delete
if (recoverSoftDeletedObject(blobId)) {
return true;
}

// fallback to restoring by copying the latest version
if (recoverLatestVersion(blobId)) {
return true;
}

} catch (IllegalArgumentException e) {
LOG.warn("Invalid GCS path format: {}", path, e);
}

return false;
}

/**
* Attempts to restore a soft-deleted object.
*
* <p>Requires {@code storage.objects.restore} permission
*
* <p>See <a
* href="https://cloud.google.com/storage/docs/use-soft-deleted-objects#restore">docs</a>
*
* @param blobId the blob identifier
* @return {@code true} if blob was recovered, {@code false} if not
*/
protected boolean recoverSoftDeletedObject(BlobId blobId) {
try {
BucketInfo.SoftDeletePolicy policy = client().get(blobId.getBucket()).getSoftDeletePolicy();
if (Duration.ofSeconds(0).equals(policy.getRetentionDuration())) {
LOG.warn("Soft delete is disabled for {}", blobId.getBucket());
return false;
}

Optional<Blob> latestSoftDeletedBlob =
client()
.list(
blobId.getBucket(),
Storage.BlobListOption.prefix(blobId.getName()),
Storage.BlobListOption.softDeleted(true))
.streamAll()
.filter(blob -> blob.getName().equals(blobId.getName()))
.max(Comparator.comparing(Blob::getSoftDeleteTime));

if (latestSoftDeletedBlob.isPresent()) {
client().restore(latestSoftDeletedBlob.get().getBlobId());
LOG.info("Soft delete object restored file {}", blobId);
return true;
}
LOG.warn("No soft deleted object was found");

} catch (StorageException e) {
LOG.warn("Failed to restore", e);
}

return false;
}

/**
* Attempts to restore the latest deleted object version.
*
* <p>See <a href="https://cloud.google.com/storage/docs/using-versioned-objects#restore">docs</a>
*
* @param blobId the blob identifier
* @return {@code true} if blob was recovered, {@code false} if not
*/
protected boolean recoverLatestVersion(BlobId blobId) {
try {
if (!client().get(blobId.getBucket()).versioningEnabled()) {
LOG.warn("Object versioning is disabled for {}", blobId.getBucket());
return false;
}

Optional<Blob> latestVersion =
client()
.list(
blobId.getBucket(),
Storage.BlobListOption.prefix(blobId.getName()),
Storage.BlobListOption.versions(true))
.streamAll()
.filter(blob -> blob.getName().equals(blobId.getName()))
.max(Comparator.comparing(Blob::getUpdateTimeOffsetDateTime));

if (latestVersion.isPresent() && latestVersion.get().getDeleteTimeOffsetDateTime() != null) {
Storage.CopyRequest copyRequest =
Storage.CopyRequest.newBuilder()
.setSource(latestVersion.get().getBlobId())
.setTarget(blobId)
.build();
Blob blob = client().copy(copyRequest).getResult();
LOG.info("Latest deleted version was restored for {}", blob.getBlobId());
return true;
}
LOG.warn("No latest deleted version was found");

} catch (StorageException e) {
LOG.warn("Failed to restore latest deleted version", e);
}

return false;
}
}
145 changes: 145 additions & 0 deletions gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,37 @@
import static org.apache.iceberg.gcp.GCPProperties.GCS_OAUTH2_TOKEN;
import static org.apache.iceberg.gcp.GCPProperties.GCS_OAUTH2_TOKEN_EXPIRES_AT;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

import com.google.api.gax.paging.Page;
import com.google.auth.oauth2.OAuth2Credentials;
import com.google.auth.oauth2.OAuth2CredentialsWithRefresh;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Bucket;
import com.google.cloud.storage.BucketInfo;
import com.google.cloud.storage.CopyWriter;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageException;
import com.google.cloud.storage.contrib.nio.testing.LocalStorageHelper;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Random;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.TestHelpers;
Expand Down Expand Up @@ -270,4 +283,136 @@ public void refreshCredentialsEndpointSetButRefreshDisabled() {

assertThat(client.getOptions().getCredentials()).isInstanceOf(OAuth2Credentials.class);
}

@Test
void recoverSoftDeletedObject_WhenSoftDeleteDisabled_ReturnsFalse() {
BlobId blobId = BlobId.of(TEST_BUCKET, "object");
Bucket bucket = mock(Bucket.class);
BucketInfo.SoftDeletePolicy policy = mock(BucketInfo.SoftDeletePolicy.class);

when(storage.get(TEST_BUCKET)).thenReturn(bucket);
when(bucket.getSoftDeletePolicy()).thenReturn(policy);
when(policy.getRetentionDuration()).thenReturn(Duration.ofSeconds(0)); // soft-delete disabled

assertThat(io.recoverSoftDeletedObject(blobId)).isFalse();
}

@Test
void recoverSoftDeletedObject_WhenSoftDeletedBlobExists_ReturnsTrue() {
BlobId blobId = BlobId.of(TEST_BUCKET, "object");
Bucket bucket = mock(Bucket.class);
BucketInfo.SoftDeletePolicy policy = mock(BucketInfo.SoftDeletePolicy.class);
Page<Blob> page = mock(Page.class);
Blob softDeletedBlob = mock(Blob.class);

when(storage.get(TEST_BUCKET)).thenReturn(bucket);
when(bucket.getSoftDeletePolicy()).thenReturn(policy);
when(policy.getRetentionDuration()).thenReturn(Duration.ofDays(7)); // soft-delete enabled
doAnswer(invoke -> page).when(storage).list(eq(TEST_BUCKET), any(), any());
when(page.streamAll()).thenReturn(Stream.of(softDeletedBlob));
when(softDeletedBlob.getName()).thenReturn("object");
when(softDeletedBlob.getSoftDeleteTime()).thenReturn(OffsetDateTime.now());
when(softDeletedBlob.getBlobId()).thenReturn(blobId);
doAnswer(invocation -> softDeletedBlob).when(storage).restore(any(), any());

assertThat(io.recoverSoftDeletedObject(blobId)).isTrue();
}

@Test
void recoverSoftDeletedObject_WhenNoSoftDeletedBlobExists_ReturnsFalse() {
BlobId blobId = BlobId.of(TEST_BUCKET, "object");
Bucket bucket = mock(Bucket.class);
BucketInfo.SoftDeletePolicy policy = mock(BucketInfo.SoftDeletePolicy.class);
Page<Blob> page = mock(Page.class);

when(storage.get(TEST_BUCKET)).thenReturn(bucket);
when(bucket.getSoftDeletePolicy()).thenReturn(policy);
when(policy.getRetentionDuration()).thenReturn(Duration.ofDays(7)); // soft-delete enabled
when(page.streamAll()).thenReturn(Stream.empty());

assertThat(io.recoverSoftDeletedObject(blobId)).isFalse();
}

@Test
void recoverSoftDelete_WhenStorageException_ReturnsFalse() {
BlobId blobId = BlobId.of(TEST_BUCKET, "object");
Bucket bucket = mock(Bucket.class);
BucketInfo.SoftDeletePolicy policy = mock(BucketInfo.SoftDeletePolicy.class);

when(storage.get(TEST_BUCKET)).thenReturn(bucket);
when(bucket.getSoftDeletePolicy()).thenReturn(policy);
when(policy.getRetentionDuration()).thenReturn(Duration.ofDays(7)); // soft-delete enabled
doThrow(StorageException.class).when(storage).list(eq(TEST_BUCKET), any(), any());

assertThat(io.recoverLatestVersion(blobId)).isFalse();
}

@Test
void recoverLatestVersion_WhenVersioningDisabled_ReturnsFalse() {
BlobId blobId = BlobId.of(TEST_BUCKET, "object");
Bucket bucket = mock(Bucket.class);

when(storage.get(TEST_BUCKET)).thenReturn(bucket);
when(bucket.versioningEnabled()).thenReturn(false);

assertThat(io.recoverLatestVersion(blobId)).isFalse();
}

@Test
void recoverLatestVersion_WhenVersionExists_ReturnsTrue() {
BlobId blobId = BlobId.of(TEST_BUCKET, "object");
Bucket bucket = mock(Bucket.class);
Blob versionedBlob = mock(Blob.class);
CopyWriter copyWriter = mock(CopyWriter.class);
Page<Blob> page = mock(Page.class);

when(storage.get(TEST_BUCKET)).thenReturn(bucket);
when(bucket.versioningEnabled()).thenReturn(true);
when(versionedBlob.getName()).thenReturn("object");
when(versionedBlob.getUpdateTimeOffsetDateTime()).thenReturn(OffsetDateTime.now());
when(versionedBlob.getDeleteTimeOffsetDateTime()).thenReturn(OffsetDateTime.now());
when(versionedBlob.getBlobId()).thenReturn(blobId);
doAnswer(invoke -> page).when(storage).list(eq(TEST_BUCKET), any(), any());
when(page.streamAll()).thenReturn(Stream.of(versionedBlob));
// mock successful copy operation
doAnswer(invocation -> copyWriter).when(storage).copy(any());
when(storage.copy(any())).thenReturn(copyWriter);
when(copyWriter.getResult()).thenReturn(versionedBlob);

assertThat(io.recoverLatestVersion(blobId)).isTrue();
}

@Test
void recoverLatestVersion_WhenNoVersionExists_ReturnsFalse() {
BlobId blobId = BlobId.of(TEST_BUCKET, "object");
Bucket bucket = mock(Bucket.class);
Page<Blob> page = mock(Page.class);

when(storage.get(TEST_BUCKET)).thenReturn(bucket);
when(bucket.versioningEnabled()).thenReturn(true);
doAnswer(invoke -> page).when(storage).list(eq(TEST_BUCKET), any(), any());
when(page.streamAll()).thenReturn(Stream.empty());

assertThat(io.recoverLatestVersion(blobId)).isFalse();
}

@Test
void recoverLatestVersion_WhenCopyFails_ReturnsFalse() {
BlobId blobId = BlobId.of(TEST_BUCKET, "object");
Bucket bucket = mock(Bucket.class);
Blob versionedBlob = mock(Blob.class);
Page<Blob> page = mock(Page.class);

when(storage.get(TEST_BUCKET)).thenReturn(bucket);
when(bucket.versioningEnabled()).thenReturn(true);
when(versionedBlob.getName()).thenReturn("object");
when(versionedBlob.getUpdateTimeOffsetDateTime()).thenReturn(OffsetDateTime.now());
when(versionedBlob.getBlobId()).thenReturn(blobId);
doAnswer(invoke -> page).when(storage).list(eq(TEST_BUCKET), any(), any());
when(page.streamAll()).thenReturn(Stream.of(versionedBlob));
// mock failed copy operation
doThrow(StorageException.class).when(storage).copy(any());

assertThat(io.recoverLatestVersion(blobId)).isFalse();
}
}