diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/GCPProperties.java b/gcp/src/main/java/org/apache/iceberg/gcp/GCPProperties.java index 109a4d21ed3c..521eb4c6c867 100644 --- a/gcp/src/main/java/org/apache/iceberg/gcp/GCPProperties.java +++ b/gcp/src/main/java/org/apache/iceberg/gcp/GCPProperties.java @@ -22,6 +22,7 @@ import java.util.Date; import java.util.Map; import java.util.Optional; +import org.apache.iceberg.util.PropertyUtil; public class GCPProperties implements Serializable { // Service Options @@ -40,6 +41,14 @@ public class GCPProperties implements Serializable { public static final String GCS_OAUTH2_TOKEN = "gcs.oauth2.token"; public static final String GCS_OAUTH2_TOKEN_EXPIRES_AT = "gcs.oauth2.token-expires-at"; + /** Configure the batch size used when deleting multiple files from a given GCS bucket */ + public static final String GCS_DELETE_BATCH_SIZE = "gcs.delete.batch-size"; + /** + * Max possible batch size for deletion. Currently, a max of 100 keys is advised, so we default to + * a number below that. https://cloud.google.com/storage/docs/batch + */ + public static final int GCS_DELETE_BATCH_SIZE_DEFAULT = 50; + private String projectId; private String clientLibToken; private String serviceHost; @@ -54,6 +63,8 @@ public class GCPProperties implements Serializable { private String gcsOAuth2Token; private Date gcsOAuth2TokenExpiresAt; + private int gcsDeleteBatchSize = GCS_DELETE_BATCH_SIZE_DEFAULT; + public GCPProperties() {} public GCPProperties(Map properties) { @@ -78,6 +89,10 @@ public GCPProperties(Map properties) { gcsOAuth2TokenExpiresAt = new Date(Long.parseLong(properties.get(GCS_OAUTH2_TOKEN_EXPIRES_AT))); } + + gcsDeleteBatchSize = + PropertyUtil.propertyAsInt( + properties, GCS_DELETE_BATCH_SIZE, GCS_DELETE_BATCH_SIZE_DEFAULT); } public Optional channelReadChunkSize() { @@ -119,4 +134,8 @@ public Optional oauth2Token() { public Optional oauth2TokenExpiresAt() { return Optional.ofNullable(gcsOAuth2TokenExpiresAt); } + + public int deleteBatchSize() { + return gcsDeleteBatchSize; + } } diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java index 54af44e43da8..8e3db34c6b68 100644 --- a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java +++ b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java @@ -20,17 +20,25 @@ import com.google.auth.oauth2.AccessToken; import com.google.auth.oauth2.OAuth2Credentials; +import com.google.cloud.storage.Blob; import com.google.cloud.storage.BlobId; import com.google.cloud.storage.Storage; import com.google.cloud.storage.StorageOptions; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Stream; import org.apache.iceberg.common.DynConstructors; import org.apache.iceberg.gcp.GCPProperties; +import org.apache.iceberg.io.BulkDeletionFailureException; import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.FileInfo; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.SupportsBulkOperations; +import org.apache.iceberg.io.SupportsPrefixOperations; import org.apache.iceberg.metrics.MetricsContext; +import org.apache.iceberg.relocated.com.google.common.collect.Iterators; +import org.apache.iceberg.relocated.com.google.common.collect.Streams; import org.apache.iceberg.util.SerializableMap; import org.apache.iceberg.util.SerializableSupplier; import org.slf4j.Logger; @@ -48,7 +56,7 @@ *

See Cloud Storage * Overview */ -public class GCSFileIO implements FileIO { +public class GCSFileIO implements FileIO, SupportsBulkOperations, SupportsPrefixOperations { private static final Logger LOG = LoggerFactory.getLogger(GCSFileIO.class); private static final String DEFAULT_METRICS_IMPL = "org.apache.iceberg.hadoop.HadoopMetricsContext"; @@ -174,4 +182,44 @@ public void close() { } } } + + @Override + public Iterable listPrefix(String prefix) { + GCSLocation location = new GCSLocation(prefix); + return () -> + client() + .list(location.bucket(), Storage.BlobListOption.prefix(location.prefix())) + .streamAll() + .map( + blob -> + new FileInfo( + String.format("gs://%s/%s", blob.getBucket(), blob.getName()), + blob.getSize(), + createTimeMillis(blob))) + .iterator(); + } + + private long createTimeMillis(Blob blob) { + if (blob.getCreateTimeOffsetDateTime() == null) { + return 0; + } + return blob.getCreateTimeOffsetDateTime().toInstant().toEpochMilli(); + } + + @Override + public void deletePrefix(String prefix) { + internalDeleteFiles( + Streams.stream(listPrefix(prefix)) + .map(fileInfo -> BlobId.fromGsUtilUri(fileInfo.location()))); + } + + @Override + public void deleteFiles(Iterable pathsToDelete) throws BulkDeletionFailureException { + internalDeleteFiles(Streams.stream(pathsToDelete).map(BlobId::fromGsUtilUri)); + } + + private void internalDeleteFiles(Stream blobIdsToDelete) { + Streams.stream(Iterators.partition(blobIdsToDelete.iterator(), gcpProperties.deleteBatchSize())) + .forEach(batch -> client().delete(batch)); + } } diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSLocation.java b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSLocation.java new file mode 100644 index 000000000000..e1de27dcd577 --- /dev/null +++ b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSLocation.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.gcp.gcs; + +import com.google.cloud.storage.BlobId; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +/** + * This class represents a fully qualified location in GCS expressed as a URI. This class allows for + * URIs with only a bucket and no path specified, unlike with {@link BlobId#fromGsUtilUri(String)}. + */ +class GCSLocation { + private static final String SCHEME_DELIM = "://"; + private static final String PATH_DELIM = "/"; + private static final String QUERY_DELIM = "\\?"; + private static final String FRAGMENT_DELIM = "#"; + + private static final String EXPECTED_SCHEME = "gs"; + + private final String bucket; + private final String prefix; + + /** + * Creates a new GCSLocation with the form of scheme://bucket/path?query#fragment + * + * @param location fully qualified URI + */ + GCSLocation(String location) { + Preconditions.checkArgument(location != null, "Invalid location: null"); + + String[] schemeSplit = location.split(SCHEME_DELIM, -1); + ValidationException.check( + schemeSplit.length == 2, "Invalid GCS URI, cannot determine scheme: %s", location); + + String scheme = schemeSplit[0]; + ValidationException.check( + EXPECTED_SCHEME.equals(scheme), "Invalid GCS URI, invalid scheme: %s", scheme); + + String[] authoritySplit = schemeSplit[1].split(PATH_DELIM, 2); + + this.bucket = authoritySplit[0]; + + // Strip query and fragment if they exist + String path = authoritySplit.length > 1 ? authoritySplit[1] : ""; + path = path.split(QUERY_DELIM, -1)[0]; + path = path.split(FRAGMENT_DELIM, -1)[0]; + this.prefix = path; + } + + /** Returns GCS bucket name. */ + public String bucket() { + return bucket; + } + + /** Returns GCS object name prefix. */ + public String prefix() { + return prefix; + } +} diff --git a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java index 4e91b3e0e753..013c4d4955a2 100644 --- a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java +++ b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java @@ -20,7 +20,11 @@ import static java.lang.String.format; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.spy; +import com.google.cloud.storage.BlobId; import com.google.cloud.storage.BlobInfo; import com.google.cloud.storage.Storage; import com.google.cloud.storage.contrib.nio.testing.LocalStorageHelper; @@ -28,6 +32,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; +import java.util.List; import java.util.Random; import java.util.stream.StreamSupport; import org.apache.iceberg.TestHelpers; @@ -36,7 +41,9 @@ import org.apache.iceberg.io.IOUtil; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -44,11 +51,25 @@ public class GCSFileIOTest { private static final String TEST_BUCKET = "TEST_BUCKET"; private final Random random = new Random(1); - private final Storage storage = LocalStorageHelper.getOptions().getService(); + private final Storage storage = spy(LocalStorageHelper.getOptions().getService()); private GCSFileIO io; @BeforeEach public void before() { + // LocalStorageHelper doesn't support batch operations, so mock that here + doAnswer( + invoke -> { + Iterable iter = invoke.getArgument(0); + List answer = Lists.newArrayList(); + iter.forEach( + blobId -> { + answer.add(storage.delete(blobId)); + }); + return answer; + }) + .when(storage) + .delete(any(Iterable.class)); + io = new GCSFileIO(() -> storage, new GCPProperties()); } @@ -91,7 +112,7 @@ public void testDelete() { .count()) .isEqualTo(1); - io.deleteFile(format("gs://%s/%s", TEST_BUCKET, path)); + io.deleteFile(gsUri(path)); // The bucket should now be empty assertThat( @@ -100,6 +121,70 @@ public void testDelete() { .isZero(); } + private String gsUri(String path) { + return format("gs://%s/%s", TEST_BUCKET, path); + } + + @Test + public void testListPrefix() { + String prefix = "list/path/"; + String path1 = prefix + "data1.dat"; + storage.create(BlobInfo.newBuilder(TEST_BUCKET, path1).build()); + String path2 = prefix + "data2.dat"; + storage.create(BlobInfo.newBuilder(TEST_BUCKET, path2).build()); + String path3 = "list/skip/data3.dat"; + storage.create(BlobInfo.newBuilder(TEST_BUCKET, path3).build()); + + assertThat(StreamSupport.stream(io.listPrefix(gsUri("list/")).spliterator(), false).count()) + .isEqualTo(3); + + assertThat(StreamSupport.stream(io.listPrefix(gsUri(prefix)).spliterator(), false).count()) + .isEqualTo(2); + + assertThat(StreamSupport.stream(io.listPrefix(gsUri(path1)).spliterator(), false).count()) + .isEqualTo(1); + } + + @Test + public void testDeleteFiles() { + String prefix = "del/path/"; + String path1 = prefix + "data1.dat"; + storage.create(BlobInfo.newBuilder(TEST_BUCKET, path1).build()); + String path2 = prefix + "data2.dat"; + storage.create(BlobInfo.newBuilder(TEST_BUCKET, path2).build()); + String path3 = "del/skip/data3.dat"; + storage.create(BlobInfo.newBuilder(TEST_BUCKET, path3).build()); + + assertThat(StreamSupport.stream(io.listPrefix(gsUri("del/")).spliterator(), false).count()) + .isEqualTo(3); + + Iterable deletes = + () -> ImmutableList.of(gsUri(path1), gsUri(path3)).stream().iterator(); + io.deleteFiles(deletes); + + assertThat(StreamSupport.stream(io.listPrefix(gsUri("del/")).spliterator(), false).count()) + .isEqualTo(1); + } + + @Test + public void testDeletePrefix() { + String prefix = "del/path/"; + String path1 = prefix + "data1.dat"; + storage.create(BlobInfo.newBuilder(TEST_BUCKET, path1).build()); + String path2 = prefix + "data2.dat"; + storage.create(BlobInfo.newBuilder(TEST_BUCKET, path2).build()); + String path3 = "del/skip/data3.dat"; + storage.create(BlobInfo.newBuilder(TEST_BUCKET, path3).build()); + + assertThat(StreamSupport.stream(io.listPrefix(gsUri("del/")).spliterator(), false).count()) + .isEqualTo(3); + + io.deletePrefix(gsUri(prefix)); + + assertThat(StreamSupport.stream(io.listPrefix(gsUri("del/")).spliterator(), false).count()) + .isEqualTo(1); + } + @Test public void testGCSFileIOKryoSerialization() throws IOException { FileIO testGCSFileIO = new GCSFileIO(); diff --git a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSLocationTest.java b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSLocationTest.java new file mode 100644 index 000000000000..b92cfc35aec5 --- /dev/null +++ b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSLocationTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.gcp.gcs; + +import org.apache.iceberg.exceptions.ValidationException; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +public class GCSLocationTest { + @Test + public void testLocationParsing() { + String p1 = "gs://bucket/path/to/prefix"; + GCSLocation location = new GCSLocation(p1); + + Assertions.assertThat(location.bucket()).isEqualTo("bucket"); + Assertions.assertThat(location.prefix()).isEqualTo("path/to/prefix"); + } + + @Test + public void testEncodedString() { + String p1 = "gs://bucket/path%20to%20prefix"; + GCSLocation location = new GCSLocation(p1); + + Assertions.assertThat(location.bucket()).isEqualTo("bucket"); + Assertions.assertThat(location.prefix()).isEqualTo("path%20to%20prefix"); + } + + @Test + public void testMissingScheme() { + Assertions.assertThatThrownBy(() -> new GCSLocation("/path/to/prefix")) + .isInstanceOf(ValidationException.class) + .hasMessage("Invalid GCS URI, cannot determine scheme: /path/to/prefix"); + } + + @Test + public void tesInvalidScheme() { + Assertions.assertThatThrownBy(() -> new GCSLocation("s3://bucket/path/to/prefix")) + .isInstanceOf(ValidationException.class) + .hasMessage("Invalid GCS URI, invalid scheme: s3"); + } + + @Test + public void testOnlyBucketNameLocation() { + String p1 = "gs://bucket"; + GCSLocation location = new GCSLocation(p1); + + Assertions.assertThat(location.bucket()).isEqualTo("bucket"); + Assertions.assertThat(location.prefix()).isEqualTo(""); + } + + @Test + public void testQueryAndFragment() { + String p1 = "gs://bucket/path/to/prefix?query=foo#bar"; + GCSLocation location = new GCSLocation(p1); + + Assertions.assertThat(location.bucket()).isEqualTo("bucket"); + Assertions.assertThat(location.prefix()).isEqualTo("path/to/prefix"); + } +}