Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,7 @@ project(':iceberg-gcp') {
testImplementation "com.google.cloud:google-cloud-nio"

testImplementation project(path: ':iceberg-api', configuration: 'testArtifacts')
testImplementation project(path: ':iceberg-core', configuration: 'testArtifacts')

testImplementation(libs.hadoop2.common) {
exclude group: 'org.apache.avro', module: 'avro'
Expand All @@ -662,6 +663,8 @@ project(':iceberg-gcp') {
exclude group: 'com.google.code.gson', module: 'gson'
}
testImplementation libs.esotericsoftware.kryo
testImplementation libs.mockserver.netty
testImplementation libs.mockserver.client.java
}
}

Expand Down
20 changes: 20 additions & 0 deletions gcp/src/main/java/org/apache/iceberg/gcp/GCPProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ public class GCPProperties implements Serializable {
public static final String GCS_OAUTH2_TOKEN_EXPIRES_AT = "gcs.oauth2.token-expires-at";
// Boolean to explicitly configure "no authentication" for testing purposes using a GCS emulator
public static final String GCS_NO_AUTH = "gcs.no-auth";
public static final String GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT =
"gcs.oauth2.refresh-credentials-endpoint";

/** Controls whether vended credentials should be refreshed or not. Defaults to true. */
public static final String GCS_OAUTH2_REFRESH_CREDENTIALS_ENABLED =
"gcs.oauth2.refresh-credentials-enabled";

/** Configure the batch size used when deleting multiple files from a given GCS bucket */
public static final String GCS_DELETE_BATCH_SIZE = "gcs.delete.batch-size";
Expand All @@ -67,6 +73,8 @@ public class GCPProperties implements Serializable {
private boolean gcsNoAuth;
private String gcsOAuth2Token;
private Date gcsOAuth2TokenExpiresAt;
private String gcsOauth2RefreshCredentialsEndpoint;
private boolean gcsOauth2RefreshCredentialsEnabled;

private int gcsDeleteBatchSize = GCS_DELETE_BATCH_SIZE_DEFAULT;

Expand Down Expand Up @@ -95,6 +103,10 @@ public GCPProperties(Map<String, String> properties) {
gcsOAuth2TokenExpiresAt =
new Date(Long.parseLong(properties.get(GCS_OAUTH2_TOKEN_EXPIRES_AT)));
}

gcsOauth2RefreshCredentialsEndpoint = properties.get(GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT);
gcsOauth2RefreshCredentialsEnabled =
PropertyUtil.propertyAsBoolean(properties, GCS_OAUTH2_REFRESH_CREDENTIALS_ENABLED, true);
gcsNoAuth = Boolean.parseBoolean(properties.getOrDefault(GCS_NO_AUTH, "false"));
Preconditions.checkState(
!(gcsOAuth2Token != null && gcsNoAuth),
Expand Down Expand Up @@ -154,4 +166,12 @@ public Optional<Date> oauth2TokenExpiresAt() {
public int deleteBatchSize() {
return gcsDeleteBatchSize;
}

public Optional<String> oauth2RefreshCredentialsEndpoint() {
return Optional.ofNullable(gcsOauth2RefreshCredentialsEndpoint);
}

public boolean oauth2RefreshCredentialsEnabled() {
return gcsOauth2RefreshCredentialsEnabled;
}
}
12 changes: 11 additions & 1 deletion gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.google.auth.oauth2.AccessToken;
import com.google.auth.oauth2.OAuth2Credentials;
import com.google.auth.oauth2.OAuth2CredentialsWithRefresh;
import com.google.cloud.NoCredentials;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobId;
Expand Down Expand Up @@ -156,7 +157,16 @@ public void initialize(Map<String, String> props) {
// Explicitly configure an OAuth token.
AccessToken accessToken =
new AccessToken(token, gcpProperties.oauth2TokenExpiresAt().orElse(null));
builder.setCredentials(OAuth2Credentials.create(accessToken));
if (gcpProperties.oauth2RefreshCredentialsEnabled()
&& gcpProperties.oauth2RefreshCredentialsEndpoint().isPresent()) {
builder.setCredentials(
OAuth2CredentialsWithRefresh.newBuilder()
.setAccessToken(accessToken)
.setRefreshHandler(OAuth2RefreshCredentialsHandler.create(properties))
.build());
} else {
builder.setCredentials(OAuth2Credentials.create(accessToken));
}
});

return builder.build().getService();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.gcp.gcs;

import com.google.auth.oauth2.AccessToken;
import com.google.auth.oauth2.OAuth2CredentialsWithRefresh;
import java.io.IOException;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.iceberg.gcp.GCPProperties;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.rest.ErrorHandlers;
import org.apache.iceberg.rest.HTTPClient;
import org.apache.iceberg.rest.RESTClient;
import org.apache.iceberg.rest.auth.OAuth2Properties;
import org.apache.iceberg.rest.auth.OAuth2Util;
import org.apache.iceberg.rest.credentials.Credential;
import org.apache.iceberg.rest.responses.LoadCredentialsResponse;

public class OAuth2RefreshCredentialsHandler
implements OAuth2CredentialsWithRefresh.OAuth2RefreshHandler {
private final Map<String, String> properties;

private OAuth2RefreshCredentialsHandler(Map<String, String> properties) {
Preconditions.checkArgument(
null != properties.get(GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT),
"Invalid credentials endpoint: null");
this.properties = properties;
}

@Override
public AccessToken refreshAccessToken() {
LoadCredentialsResponse response;
try (RESTClient client = httpClient()) {
response =
client.get(
properties.get(GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT),
null,
LoadCredentialsResponse.class,
OAuth2Util.authHeaders(properties.get(OAuth2Properties.TOKEN)),
ErrorHandlers.defaultErrorHandler());
} catch (IOException e) {
throw new RuntimeException(e);
}

List<Credential> gcsCredentials =
response.credentials().stream()
.filter(c -> c.prefix().startsWith("gs"))
.collect(Collectors.toList());

Preconditions.checkState(!gcsCredentials.isEmpty(), "Invalid GCS Credentials: empty");
Preconditions.checkState(
gcsCredentials.size() == 1,
"Invalid GCS Credentials: only one GCS credential should exist");

Credential gcsCredential = gcsCredentials.get(0);
checkCredential(gcsCredential, GCPProperties.GCS_OAUTH2_TOKEN);
checkCredential(gcsCredential, GCPProperties.GCS_OAUTH2_TOKEN_EXPIRES_AT);
String token = gcsCredential.config().get(GCPProperties.GCS_OAUTH2_TOKEN);
String expiresAt = gcsCredential.config().get(GCPProperties.GCS_OAUTH2_TOKEN_EXPIRES_AT);

return new AccessToken(token, new Date(Long.parseLong(expiresAt)));
}

private void checkCredential(Credential gcsCredential, String gcsOauth2Token) {
Preconditions.checkState(
gcsCredential.config().containsKey(gcsOauth2Token),
"Invalid GCS Credentials: %s not set",
gcsOauth2Token);
}

public static OAuth2RefreshCredentialsHandler create(Map<String, String> properties) {
return new OAuth2RefreshCredentialsHandler(properties);
}

private RESTClient httpClient() {
return HTTPClient.builder(properties)
.uri(properties.get(GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT))
.build();
}
}
30 changes: 30 additions & 0 deletions gcp/src/test/java/org/apache/iceberg/gcp/GCPPropertiesTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.iceberg.gcp;

import static org.apache.iceberg.gcp.GCPProperties.GCS_NO_AUTH;
import static org.apache.iceberg.gcp.GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENABLED;
import static org.apache.iceberg.gcp.GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT;
import static org.apache.iceberg.gcp.GCPProperties.GCS_OAUTH2_TOKEN;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
Expand Down Expand Up @@ -47,4 +49,32 @@ public void testOAuthWithNoAuth() {
assertThat(gcpProperties.noAuth()).isTrue();
assertThat(gcpProperties.oauth2Token()).isNotPresent();
}

@Test
public void refreshCredentialsEndpointSet() {
GCPProperties gcpProperties =
new GCPProperties(
ImmutableMap.of(GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT, "/v1/credentials"));
assertThat(gcpProperties.oauth2RefreshCredentialsEnabled()).isTrue();
assertThat(gcpProperties.oauth2RefreshCredentialsEndpoint())
.isPresent()
.get()
.isEqualTo("/v1/credentials");
}

@Test
public void refreshCredentialsEndpointSetButRefreshDisabled() {
GCPProperties gcpProperties =
new GCPProperties(
ImmutableMap.of(
GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT,
"/v1/credentials",
GCS_OAUTH2_REFRESH_CREDENTIALS_ENABLED,
"false"));
assertThat(gcpProperties.oauth2RefreshCredentialsEnabled()).isFalse();
assertThat(gcpProperties.oauth2RefreshCredentialsEndpoint())
.isPresent()
.get()
.isEqualTo("/v1/credentials");
}
}
47 changes: 47 additions & 0 deletions gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,17 @@
package org.apache.iceberg.gcp.gcs;

import static java.lang.String.format;
import static org.apache.iceberg.gcp.GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENABLED;
import static org.apache.iceberg.gcp.GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT;
import static org.apache.iceberg.gcp.GCPProperties.GCS_OAUTH2_TOKEN;
import static org.apache.iceberg.gcp.GCPProperties.GCS_OAUTH2_TOKEN_EXPIRES_AT;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;

import com.google.auth.oauth2.OAuth2Credentials;
import com.google.auth.oauth2.OAuth2CredentialsWithRefresh;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
Expand All @@ -32,6 +38,8 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Random;
import java.util.stream.StreamSupport;
Expand Down Expand Up @@ -223,4 +231,43 @@ public void testResolvingFileIOLoad() {
.invoke("gs://foo/bar");
assertThat(result).isInstanceOf(GCSFileIO.class);
}

@Test
public void refreshCredentialsEndpointSet() {
Storage client;
try (GCSFileIO fileIO = new GCSFileIO()) {
fileIO.initialize(
ImmutableMap.of(
GCS_OAUTH2_TOKEN,
"gcsToken",
GCS_OAUTH2_TOKEN_EXPIRES_AT,
Long.toString(Instant.now().plus(5, ChronoUnit.MINUTES).toEpochMilli()),
GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT,
"/v1/credentials"));
client = fileIO.client();
}

assertThat(client.getOptions().getCredentials())
.isInstanceOf(OAuth2CredentialsWithRefresh.class);
}

@Test
public void refreshCredentialsEndpointSetButRefreshDisabled() {
Storage client;
try (GCSFileIO fileIO = new GCSFileIO()) {
fileIO.initialize(
ImmutableMap.of(
GCS_OAUTH2_TOKEN,
"gcsTokenWithoutRefresh",
GCS_OAUTH2_TOKEN_EXPIRES_AT,
Long.toString(Instant.now().plus(5, ChronoUnit.MINUTES).toEpochMilli()),
GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT,
"/v1/credentials",
GCS_OAUTH2_REFRESH_CREDENTIALS_ENABLED,
"false"));
client = fileIO.client();
}

assertThat(client.getOptions().getCredentials()).isInstanceOf(OAuth2Credentials.class);
}
}
Loading