Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions gcp/src/main/java/org/apache/iceberg/gcp/GCPProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Date;
import java.util.Map;
import java.util.Optional;
import org.apache.iceberg.util.PropertyUtil;

public class GCPProperties implements Serializable {
// Service Options
Expand All @@ -40,6 +41,14 @@ public class GCPProperties implements Serializable {
public static final String GCS_OAUTH2_TOKEN = "gcs.oauth2.token";
public static final String GCS_OAUTH2_TOKEN_EXPIRES_AT = "gcs.oauth2.token-expires-at";

/** Configure the batch size used when deleting multiple files from a given GCS bucket */
public static final String GCS_DELETE_BATCH_SIZE = "gcs.delete.batch-size";
/**
* Max possible batch size for deletion. Currently, a max of 100 keys is advised, so we default to
* a number below that. https://cloud.google.com/storage/docs/batch
*/
public static final int GCS_DELETE_BATCH_SIZE_DEFAULT = 50;

private String projectId;
private String clientLibToken;
private String serviceHost;
Expand All @@ -54,6 +63,8 @@ public class GCPProperties implements Serializable {
private String gcsOAuth2Token;
private Date gcsOAuth2TokenExpiresAt;

private int gcsDeleteBatchSize = GCS_DELETE_BATCH_SIZE_DEFAULT;

public GCPProperties() {}

public GCPProperties(Map<String, String> properties) {
Expand All @@ -78,6 +89,10 @@ public GCPProperties(Map<String, String> properties) {
gcsOAuth2TokenExpiresAt =
new Date(Long.parseLong(properties.get(GCS_OAUTH2_TOKEN_EXPIRES_AT)));
}

gcsDeleteBatchSize =
PropertyUtil.propertyAsInt(
properties, GCS_DELETE_BATCH_SIZE, GCS_DELETE_BATCH_SIZE_DEFAULT);
}

public Optional<Integer> channelReadChunkSize() {
Expand Down Expand Up @@ -119,4 +134,8 @@ public Optional<String> oauth2Token() {
public Optional<Date> oauth2TokenExpiresAt() {
return Optional.ofNullable(gcsOAuth2TokenExpiresAt);
}

public int deleteBatchSize() {
return gcsDeleteBatchSize;
}
}
50 changes: 49 additions & 1 deletion gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,25 @@

import com.google.auth.oauth2.AccessToken;
import com.google.auth.oauth2.OAuth2Credentials;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;
import org.apache.iceberg.common.DynConstructors;
import org.apache.iceberg.gcp.GCPProperties;
import org.apache.iceberg.io.BulkDeletionFailureException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.FileInfo;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.io.SupportsBulkOperations;
import org.apache.iceberg.io.SupportsPrefixOperations;
import org.apache.iceberg.metrics.MetricsContext;
import org.apache.iceberg.relocated.com.google.common.collect.Iterators;
import org.apache.iceberg.relocated.com.google.common.collect.Streams;
import org.apache.iceberg.util.SerializableMap;
import org.apache.iceberg.util.SerializableSupplier;
import org.slf4j.Logger;
Expand All @@ -48,7 +56,7 @@
* <p>See <a href="https://cloud.google.com/storage/docs/folders#overview">Cloud Storage
* Overview</a>
*/
public class GCSFileIO implements FileIO {
public class GCSFileIO implements FileIO, SupportsBulkOperations, SupportsPrefixOperations {
private static final Logger LOG = LoggerFactory.getLogger(GCSFileIO.class);
private static final String DEFAULT_METRICS_IMPL =
"org.apache.iceberg.hadoop.HadoopMetricsContext";
Expand Down Expand Up @@ -174,4 +182,44 @@ public void close() {
}
}
}

@Override
public Iterable<FileInfo> listPrefix(String prefix) {
GCSLocation location = new GCSLocation(prefix);
return () ->
client()
.list(location.bucket(), Storage.BlobListOption.prefix(location.prefix()))
.streamAll()
.map(
blob ->
new FileInfo(
String.format("gs://%s/%s", blob.getBucket(), blob.getName()),
blob.getSize(),
createTimeMillis(blob)))
.iterator();
}

private long createTimeMillis(Blob blob) {
if (blob.getCreateTimeOffsetDateTime() == null) {
return 0;
}
return blob.getCreateTimeOffsetDateTime().toInstant().toEpochMilli();
}

@Override
public void deletePrefix(String prefix) {
internalDeleteFiles(
Streams.stream(listPrefix(prefix))
.map(fileInfo -> BlobId.fromGsUtilUri(fileInfo.location())));
}

@Override
public void deleteFiles(Iterable<String> pathsToDelete) throws BulkDeletionFailureException {
internalDeleteFiles(Streams.stream(pathsToDelete).map(BlobId::fromGsUtilUri));
}

private void internalDeleteFiles(Stream<BlobId> blobIdsToDelete) {
Streams.stream(Iterators.partition(blobIdsToDelete.iterator(), gcpProperties.deleteBatchSize()))
.forEach(batch -> client().delete(batch));
}
}
76 changes: 76 additions & 0 deletions gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSLocation.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.gcp.gcs;

import com.google.cloud.storage.BlobId;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

/**
* This class represents a fully qualified location in GCS expressed as a URI. This class allows for
* URIs with only a bucket and no path specified, unlike with {@link BlobId#fromGsUtilUri(String)}.
*/
class GCSLocation {
private static final String SCHEME_DELIM = "://";
private static final String PATH_DELIM = "/";
private static final String QUERY_DELIM = "\\?";
private static final String FRAGMENT_DELIM = "#";

private static final String EXPECTED_SCHEME = "gs";

private final String bucket;
private final String prefix;

/**
* Creates a new GCSLocation with the form of scheme://bucket/path?query#fragment
*
* @param location fully qualified URI
*/
GCSLocation(String location) {
Preconditions.checkArgument(location != null, "Invalid location: null");

String[] schemeSplit = location.split(SCHEME_DELIM, -1);
ValidationException.check(
schemeSplit.length == 2, "Invalid GCS URI, cannot determine scheme: %s", location);

String scheme = schemeSplit[0];
ValidationException.check(
EXPECTED_SCHEME.equals(scheme), "Invalid GCS URI, invalid scheme: %s", scheme);

String[] authoritySplit = schemeSplit[1].split(PATH_DELIM, 2);

this.bucket = authoritySplit[0];

// Strip query and fragment if they exist
String path = authoritySplit.length > 1 ? authoritySplit[1] : "";
path = path.split(QUERY_DELIM, -1)[0];
path = path.split(FRAGMENT_DELIM, -1)[0];
this.prefix = path;
}

/** Returns GCS bucket name. */
public String bucket() {
return bucket;
}

/** Returns GCS object name prefix. */
public String prefix() {
return prefix;
}
}
89 changes: 87 additions & 2 deletions gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,19 @@

import static java.lang.String.format;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;

import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.contrib.nio.testing.LocalStorageHelper;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Random;
import java.util.stream.StreamSupport;
import org.apache.iceberg.TestHelpers;
Expand All @@ -36,19 +41,35 @@
import org.apache.iceberg.io.IOUtil;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class GCSFileIOTest {
private static final String TEST_BUCKET = "TEST_BUCKET";
private final Random random = new Random(1);

private final Storage storage = LocalStorageHelper.getOptions().getService();
private final Storage storage = spy(LocalStorageHelper.getOptions().getService());
private GCSFileIO io;

@BeforeEach
public void before() {
// LocalStorageHelper doesn't support batch operations, so mock that here
doAnswer(
invoke -> {
Iterable<BlobId> iter = invoke.getArgument(0);
List<Boolean> answer = Lists.newArrayList();
iter.forEach(
blobId -> {
answer.add(storage.delete(blobId));
});
return answer;
})
.when(storage)
.delete(any(Iterable.class));

io = new GCSFileIO(() -> storage, new GCPProperties());
}

Expand Down Expand Up @@ -91,7 +112,7 @@ public void testDelete() {
.count())
.isEqualTo(1);

io.deleteFile(format("gs://%s/%s", TEST_BUCKET, path));
io.deleteFile(gsUri(path));

// The bucket should now be empty
assertThat(
Expand All @@ -100,6 +121,70 @@ public void testDelete() {
.isZero();
}

private String gsUri(String path) {
return format("gs://%s/%s", TEST_BUCKET, path);
}

@Test
public void testListPrefix() {
String prefix = "list/path/";
String path1 = prefix + "data1.dat";
storage.create(BlobInfo.newBuilder(TEST_BUCKET, path1).build());
String path2 = prefix + "data2.dat";
storage.create(BlobInfo.newBuilder(TEST_BUCKET, path2).build());
String path3 = "list/skip/data3.dat";
storage.create(BlobInfo.newBuilder(TEST_BUCKET, path3).build());

assertThat(StreamSupport.stream(io.listPrefix(gsUri("list/")).spliterator(), false).count())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry I missed this initially, but there's no need to wrap this in a stream. This (and other places) can be simplified to assertThat(io.listPrefix(gsUri("list/"))).hasSize(3);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed there are also existing tests in that file that do the same thing, could you please update those as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intent was to avoid using the FileIO we're testing to do the assertion and use the Google client directly, I assume the same for the existing tests.

.isEqualTo(3);

assertThat(StreamSupport.stream(io.listPrefix(gsUri(prefix)).spliterator(), false).count())
.isEqualTo(2);

assertThat(StreamSupport.stream(io.listPrefix(gsUri(path1)).spliterator(), false).count())
.isEqualTo(1);
}

@Test
public void testDeleteFiles() {
String prefix = "del/path/";
String path1 = prefix + "data1.dat";
storage.create(BlobInfo.newBuilder(TEST_BUCKET, path1).build());
String path2 = prefix + "data2.dat";
storage.create(BlobInfo.newBuilder(TEST_BUCKET, path2).build());
String path3 = "del/skip/data3.dat";
storage.create(BlobInfo.newBuilder(TEST_BUCKET, path3).build());

assertThat(StreamSupport.stream(io.listPrefix(gsUri("del/")).spliterator(), false).count())
.isEqualTo(3);

Iterable<String> deletes =
() -> ImmutableList.of(gsUri(path1), gsUri(path3)).stream().iterator();
io.deleteFiles(deletes);

assertThat(StreamSupport.stream(io.listPrefix(gsUri("del/")).spliterator(), false).count())
.isEqualTo(1);
}

@Test
public void testDeletePrefix() {
String prefix = "del/path/";
String path1 = prefix + "data1.dat";
storage.create(BlobInfo.newBuilder(TEST_BUCKET, path1).build());
String path2 = prefix + "data2.dat";
storage.create(BlobInfo.newBuilder(TEST_BUCKET, path2).build());
String path3 = "del/skip/data3.dat";
storage.create(BlobInfo.newBuilder(TEST_BUCKET, path3).build());

assertThat(StreamSupport.stream(io.listPrefix(gsUri("del/")).spliterator(), false).count())
.isEqualTo(3);

io.deletePrefix(gsUri(prefix));

assertThat(StreamSupport.stream(io.listPrefix(gsUri("del/")).spliterator(), false).count())
.isEqualTo(1);
}

@Test
public void testGCSFileIOKryoSerialization() throws IOException {
FileIO testGCSFileIO = new GCSFileIO();
Expand Down
Loading