diff --git a/build.gradle b/build.gradle index a654e1cba5fb..61bf31e48b8c 100644 --- a/build.gradle +++ b/build.gradle @@ -654,6 +654,7 @@ project(':iceberg-gcp') { testImplementation "com.google.cloud:google-cloud-nio" testImplementation project(path: ':iceberg-api', configuration: 'testArtifacts') + testImplementation project(path: ':iceberg-core', configuration: 'testArtifacts') testImplementation(libs.hadoop2.common) { exclude group: 'org.apache.avro', module: 'avro' @@ -662,6 +663,8 @@ project(':iceberg-gcp') { exclude group: 'com.google.code.gson', module: 'gson' } testImplementation libs.esotericsoftware.kryo + testImplementation libs.mockserver.netty + testImplementation libs.mockserver.client.java } } diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/GCPProperties.java b/gcp/src/main/java/org/apache/iceberg/gcp/GCPProperties.java index 4f60e2f91f91..c03906ae5d1e 100644 --- a/gcp/src/main/java/org/apache/iceberg/gcp/GCPProperties.java +++ b/gcp/src/main/java/org/apache/iceberg/gcp/GCPProperties.java @@ -43,6 +43,12 @@ public class GCPProperties implements Serializable { public static final String GCS_OAUTH2_TOKEN_EXPIRES_AT = "gcs.oauth2.token-expires-at"; // Boolean to explicitly configure "no authentication" for testing purposes using a GCS emulator public static final String GCS_NO_AUTH = "gcs.no-auth"; + public static final String GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT = + "gcs.oauth2.refresh-credentials-endpoint"; + + /** Controls whether vended credentials should be refreshed or not. Defaults to true. */ + public static final String GCS_OAUTH2_REFRESH_CREDENTIALS_ENABLED = + "gcs.oauth2.refresh-credentials-enabled"; /** Configure the batch size used when deleting multiple files from a given GCS bucket */ public static final String GCS_DELETE_BATCH_SIZE = "gcs.delete.batch-size"; @@ -67,6 +73,8 @@ public class GCPProperties implements Serializable { private boolean gcsNoAuth; private String gcsOAuth2Token; private Date gcsOAuth2TokenExpiresAt; + private String gcsOauth2RefreshCredentialsEndpoint; + private boolean gcsOauth2RefreshCredentialsEnabled; private int gcsDeleteBatchSize = GCS_DELETE_BATCH_SIZE_DEFAULT; @@ -95,6 +103,10 @@ public GCPProperties(Map properties) { gcsOAuth2TokenExpiresAt = new Date(Long.parseLong(properties.get(GCS_OAUTH2_TOKEN_EXPIRES_AT))); } + + gcsOauth2RefreshCredentialsEndpoint = properties.get(GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT); + gcsOauth2RefreshCredentialsEnabled = + PropertyUtil.propertyAsBoolean(properties, GCS_OAUTH2_REFRESH_CREDENTIALS_ENABLED, true); gcsNoAuth = Boolean.parseBoolean(properties.getOrDefault(GCS_NO_AUTH, "false")); Preconditions.checkState( !(gcsOAuth2Token != null && gcsNoAuth), @@ -154,4 +166,12 @@ public Optional oauth2TokenExpiresAt() { public int deleteBatchSize() { return gcsDeleteBatchSize; } + + public Optional oauth2RefreshCredentialsEndpoint() { + return Optional.ofNullable(gcsOauth2RefreshCredentialsEndpoint); + } + + public boolean oauth2RefreshCredentialsEnabled() { + return gcsOauth2RefreshCredentialsEnabled; + } } diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java index 2201c876bd38..5737606aef5e 100644 --- a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java +++ b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java @@ -20,6 +20,7 @@ import com.google.auth.oauth2.AccessToken; import com.google.auth.oauth2.OAuth2Credentials; +import com.google.auth.oauth2.OAuth2CredentialsWithRefresh; import com.google.cloud.NoCredentials; import com.google.cloud.storage.Blob; import com.google.cloud.storage.BlobId; @@ -156,7 +157,16 @@ public void initialize(Map props) { // Explicitly configure an OAuth token. AccessToken accessToken = new AccessToken(token, gcpProperties.oauth2TokenExpiresAt().orElse(null)); - builder.setCredentials(OAuth2Credentials.create(accessToken)); + if (gcpProperties.oauth2RefreshCredentialsEnabled() + && gcpProperties.oauth2RefreshCredentialsEndpoint().isPresent()) { + builder.setCredentials( + OAuth2CredentialsWithRefresh.newBuilder() + .setAccessToken(accessToken) + .setRefreshHandler(OAuth2RefreshCredentialsHandler.create(properties)) + .build()); + } else { + builder.setCredentials(OAuth2Credentials.create(accessToken)); + } }); return builder.build().getService(); diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/OAuth2RefreshCredentialsHandler.java b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/OAuth2RefreshCredentialsHandler.java new file mode 100644 index 000000000000..611e7baaec6e --- /dev/null +++ b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/OAuth2RefreshCredentialsHandler.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.gcp.gcs; + +import com.google.auth.oauth2.AccessToken; +import com.google.auth.oauth2.OAuth2CredentialsWithRefresh; +import java.io.IOException; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.iceberg.gcp.GCPProperties; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.rest.ErrorHandlers; +import org.apache.iceberg.rest.HTTPClient; +import org.apache.iceberg.rest.RESTClient; +import org.apache.iceberg.rest.auth.OAuth2Properties; +import org.apache.iceberg.rest.auth.OAuth2Util; +import org.apache.iceberg.rest.credentials.Credential; +import org.apache.iceberg.rest.responses.LoadCredentialsResponse; + +public class OAuth2RefreshCredentialsHandler + implements OAuth2CredentialsWithRefresh.OAuth2RefreshHandler { + private final Map properties; + + private OAuth2RefreshCredentialsHandler(Map properties) { + Preconditions.checkArgument( + null != properties.get(GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT), + "Invalid credentials endpoint: null"); + this.properties = properties; + } + + @Override + public AccessToken refreshAccessToken() { + LoadCredentialsResponse response; + try (RESTClient client = httpClient()) { + response = + client.get( + properties.get(GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT), + null, + LoadCredentialsResponse.class, + OAuth2Util.authHeaders(properties.get(OAuth2Properties.TOKEN)), + ErrorHandlers.defaultErrorHandler()); + } catch (IOException e) { + throw new RuntimeException(e); + } + + List gcsCredentials = + response.credentials().stream() + .filter(c -> c.prefix().startsWith("gs")) + .collect(Collectors.toList()); + + Preconditions.checkState(!gcsCredentials.isEmpty(), "Invalid GCS Credentials: empty"); + Preconditions.checkState( + gcsCredentials.size() == 1, + "Invalid GCS Credentials: only one GCS credential should exist"); + + Credential gcsCredential = gcsCredentials.get(0); + checkCredential(gcsCredential, GCPProperties.GCS_OAUTH2_TOKEN); + checkCredential(gcsCredential, GCPProperties.GCS_OAUTH2_TOKEN_EXPIRES_AT); + String token = gcsCredential.config().get(GCPProperties.GCS_OAUTH2_TOKEN); + String expiresAt = gcsCredential.config().get(GCPProperties.GCS_OAUTH2_TOKEN_EXPIRES_AT); + + return new AccessToken(token, new Date(Long.parseLong(expiresAt))); + } + + private void checkCredential(Credential gcsCredential, String gcsOauth2Token) { + Preconditions.checkState( + gcsCredential.config().containsKey(gcsOauth2Token), + "Invalid GCS Credentials: %s not set", + gcsOauth2Token); + } + + public static OAuth2RefreshCredentialsHandler create(Map properties) { + return new OAuth2RefreshCredentialsHandler(properties); + } + + private RESTClient httpClient() { + return HTTPClient.builder(properties) + .uri(properties.get(GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT)) + .build(); + } +} diff --git a/gcp/src/test/java/org/apache/iceberg/gcp/GCPPropertiesTest.java b/gcp/src/test/java/org/apache/iceberg/gcp/GCPPropertiesTest.java index c71b55828758..61bd069f0c27 100644 --- a/gcp/src/test/java/org/apache/iceberg/gcp/GCPPropertiesTest.java +++ b/gcp/src/test/java/org/apache/iceberg/gcp/GCPPropertiesTest.java @@ -19,6 +19,8 @@ package org.apache.iceberg.gcp; import static org.apache.iceberg.gcp.GCPProperties.GCS_NO_AUTH; +import static org.apache.iceberg.gcp.GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENABLED; +import static org.apache.iceberg.gcp.GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT; import static org.apache.iceberg.gcp.GCPProperties.GCS_OAUTH2_TOKEN; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatIllegalStateException; @@ -47,4 +49,32 @@ public void testOAuthWithNoAuth() { assertThat(gcpProperties.noAuth()).isTrue(); assertThat(gcpProperties.oauth2Token()).isNotPresent(); } + + @Test + public void refreshCredentialsEndpointSet() { + GCPProperties gcpProperties = + new GCPProperties( + ImmutableMap.of(GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT, "/v1/credentials")); + assertThat(gcpProperties.oauth2RefreshCredentialsEnabled()).isTrue(); + assertThat(gcpProperties.oauth2RefreshCredentialsEndpoint()) + .isPresent() + .get() + .isEqualTo("/v1/credentials"); + } + + @Test + public void refreshCredentialsEndpointSetButRefreshDisabled() { + GCPProperties gcpProperties = + new GCPProperties( + ImmutableMap.of( + GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT, + "/v1/credentials", + GCS_OAUTH2_REFRESH_CREDENTIALS_ENABLED, + "false")); + assertThat(gcpProperties.oauth2RefreshCredentialsEnabled()).isFalse(); + assertThat(gcpProperties.oauth2RefreshCredentialsEndpoint()) + .isPresent() + .get() + .isEqualTo("/v1/credentials"); + } } diff --git a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java index fbc3fe7114bb..6302f664b70a 100644 --- a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java +++ b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java @@ -19,11 +19,17 @@ package org.apache.iceberg.gcp.gcs; import static java.lang.String.format; +import static org.apache.iceberg.gcp.GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENABLED; +import static org.apache.iceberg.gcp.GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT; +import static org.apache.iceberg.gcp.GCPProperties.GCS_OAUTH2_TOKEN; +import static org.apache.iceberg.gcp.GCPProperties.GCS_OAUTH2_TOKEN_EXPIRES_AT; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.spy; +import com.google.auth.oauth2.OAuth2Credentials; +import com.google.auth.oauth2.OAuth2CredentialsWithRefresh; import com.google.cloud.storage.BlobId; import com.google.cloud.storage.BlobInfo; import com.google.cloud.storage.Storage; @@ -32,6 +38,8 @@ import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; +import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.List; import java.util.Random; import java.util.stream.StreamSupport; @@ -223,4 +231,43 @@ public void testResolvingFileIOLoad() { .invoke("gs://foo/bar"); assertThat(result).isInstanceOf(GCSFileIO.class); } + + @Test + public void refreshCredentialsEndpointSet() { + Storage client; + try (GCSFileIO fileIO = new GCSFileIO()) { + fileIO.initialize( + ImmutableMap.of( + GCS_OAUTH2_TOKEN, + "gcsToken", + GCS_OAUTH2_TOKEN_EXPIRES_AT, + Long.toString(Instant.now().plus(5, ChronoUnit.MINUTES).toEpochMilli()), + GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT, + "/v1/credentials")); + client = fileIO.client(); + } + + assertThat(client.getOptions().getCredentials()) + .isInstanceOf(OAuth2CredentialsWithRefresh.class); + } + + @Test + public void refreshCredentialsEndpointSetButRefreshDisabled() { + Storage client; + try (GCSFileIO fileIO = new GCSFileIO()) { + fileIO.initialize( + ImmutableMap.of( + GCS_OAUTH2_TOKEN, + "gcsTokenWithoutRefresh", + GCS_OAUTH2_TOKEN_EXPIRES_AT, + Long.toString(Instant.now().plus(5, ChronoUnit.MINUTES).toEpochMilli()), + GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT, + "/v1/credentials", + GCS_OAUTH2_REFRESH_CREDENTIALS_ENABLED, + "false")); + client = fileIO.client(); + } + + assertThat(client.getOptions().getCredentials()).isInstanceOf(OAuth2Credentials.class); + } } diff --git a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/OAuth2RefreshCredentialsHandlerTest.java b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/OAuth2RefreshCredentialsHandlerTest.java new file mode 100644 index 000000000000..c538745f2767 --- /dev/null +++ b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/OAuth2RefreshCredentialsHandlerTest.java @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.gcp.gcs; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockserver.integration.ClientAndServer.startClientAndServer; + +import com.google.auth.oauth2.AccessToken; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import org.apache.iceberg.exceptions.BadRequestException; +import org.apache.iceberg.exceptions.RESTException; +import org.apache.iceberg.gcp.GCPProperties; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.rest.HttpMethod; +import org.apache.iceberg.rest.credentials.Credential; +import org.apache.iceberg.rest.credentials.ImmutableCredential; +import org.apache.iceberg.rest.responses.ImmutableLoadCredentialsResponse; +import org.apache.iceberg.rest.responses.LoadCredentialsResponseParser; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockserver.integration.ClientAndServer; +import org.mockserver.model.HttpRequest; +import org.mockserver.model.HttpResponse; +import org.mockserver.verify.VerificationTimes; + +public class OAuth2RefreshCredentialsHandlerTest { + private static final int PORT = 3333; + private static final String URI = String.format("http://127.0.0.1:%d/v1/credentials", PORT); + private static ClientAndServer mockServer; + + @BeforeAll + public static void beforeAll() { + mockServer = startClientAndServer(PORT); + } + + @AfterAll + public static void stopServer() { + mockServer.stop(); + } + + @BeforeEach + public void before() { + mockServer.reset(); + } + + @Test + public void invalidOrMissingUri() { + assertThatThrownBy(() -> OAuth2RefreshCredentialsHandler.create(ImmutableMap.of())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid credentials endpoint: null"); + + assertThatThrownBy( + () -> + OAuth2RefreshCredentialsHandler.create( + ImmutableMap.of( + GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT, "invalid uri")) + .refreshAccessToken()) + .isInstanceOf(RESTException.class) + .hasMessageStartingWith("Failed to create request URI from base invalid uri"); + } + + @Test + public void badRequest() { + HttpRequest mockRequest = + HttpRequest.request("/v1/credentials").withMethod(HttpMethod.GET.name()); + + HttpResponse mockResponse = HttpResponse.response().withStatusCode(400); + mockServer.when(mockRequest).respond(mockResponse); + + OAuth2RefreshCredentialsHandler handler = + OAuth2RefreshCredentialsHandler.create( + ImmutableMap.of(GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT, URI)); + + assertThatThrownBy(handler::refreshAccessToken) + .isInstanceOf(BadRequestException.class) + .hasMessageStartingWith("Malformed request"); + } + + @Test + public void noGcsCredentialInResponse() { + HttpRequest mockRequest = + HttpRequest.request("/v1/credentials").withMethod(HttpMethod.GET.name()); + + HttpResponse mockResponse = + HttpResponse.response( + LoadCredentialsResponseParser.toJson( + ImmutableLoadCredentialsResponse.builder().build())) + .withStatusCode(200); + mockServer.when(mockRequest).respond(mockResponse); + + OAuth2RefreshCredentialsHandler handler = + OAuth2RefreshCredentialsHandler.create( + ImmutableMap.of(GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT, URI)); + + assertThatThrownBy(handler::refreshAccessToken) + .isInstanceOf(IllegalStateException.class) + .hasMessage("Invalid GCS Credentials: empty"); + } + + @Test + public void noGcsToken() { + HttpRequest mockRequest = + HttpRequest.request("/v1/credentials").withMethod(HttpMethod.GET.name()); + + Credential credential = + ImmutableCredential.builder() + .prefix("gs") + .config(ImmutableMap.of(GCPProperties.GCS_OAUTH2_TOKEN_EXPIRES_AT, "1000")) + .build(); + HttpResponse mockResponse = + HttpResponse.response( + LoadCredentialsResponseParser.toJson( + ImmutableLoadCredentialsResponse.builder().addCredentials(credential).build())) + .withStatusCode(200); + mockServer.when(mockRequest).respond(mockResponse); + + OAuth2RefreshCredentialsHandler handler = + OAuth2RefreshCredentialsHandler.create( + ImmutableMap.of(GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT, URI)); + + assertThatThrownBy(handler::refreshAccessToken) + .isInstanceOf(IllegalStateException.class) + .hasMessage("Invalid GCS Credentials: gcs.oauth2.token not set"); + } + + @Test + public void tokenWithoutExpiration() { + HttpRequest mockRequest = + HttpRequest.request("/v1/credentials").withMethod(HttpMethod.GET.name()); + + Credential credential = + ImmutableCredential.builder() + .prefix("gs") + .config(ImmutableMap.of(GCPProperties.GCS_OAUTH2_TOKEN, "gcsToken")) + .build(); + HttpResponse mockResponse = + HttpResponse.response( + LoadCredentialsResponseParser.toJson( + ImmutableLoadCredentialsResponse.builder().addCredentials(credential).build())) + .withStatusCode(200); + mockServer.when(mockRequest).respond(mockResponse); + + OAuth2RefreshCredentialsHandler handler = + OAuth2RefreshCredentialsHandler.create( + ImmutableMap.of(GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT, URI)); + + assertThatThrownBy(handler::refreshAccessToken) + .isInstanceOf(IllegalStateException.class) + .hasMessage("Invalid GCS Credentials: gcs.oauth2.token-expires-at not set"); + } + + @Test + public void tokenWithExpiration() { + HttpRequest mockRequest = + HttpRequest.request("/v1/credentials").withMethod(HttpMethod.GET.name()); + + Credential credential = + ImmutableCredential.builder() + .prefix("gs") + .config( + ImmutableMap.of( + GCPProperties.GCS_OAUTH2_TOKEN, + "gcsToken", + GCPProperties.GCS_OAUTH2_TOKEN_EXPIRES_AT, + Long.toString(Instant.now().plus(5, ChronoUnit.MINUTES).toEpochMilli()))) + .build(); + HttpResponse mockResponse = + HttpResponse.response( + LoadCredentialsResponseParser.toJson( + ImmutableLoadCredentialsResponse.builder().addCredentials(credential).build())) + .withStatusCode(200); + mockServer.when(mockRequest).respond(mockResponse); + + OAuth2RefreshCredentialsHandler handler = + OAuth2RefreshCredentialsHandler.create( + ImmutableMap.of(GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT, URI)); + + AccessToken accessToken = handler.refreshAccessToken(); + assertThat(accessToken.getTokenValue()) + .isEqualTo(credential.config().get(GCPProperties.GCS_OAUTH2_TOKEN)); + assertThat(accessToken.getExpirationTime().toInstant().toEpochMilli()) + .isEqualTo( + Long.parseLong(credential.config().get(GCPProperties.GCS_OAUTH2_TOKEN_EXPIRES_AT))); + + // refresh always fetches a new token + AccessToken refreshedToken = handler.refreshAccessToken(); + assertThat(refreshedToken).isNotSameAs(accessToken); + + mockServer.verify(mockRequest, VerificationTimes.exactly(2)); + } + + @Test + public void multipleGcsCredentials() { + HttpRequest mockRequest = + HttpRequest.request("/v1/credentials").withMethod(HttpMethod.GET.name()); + + Credential credentialOne = + ImmutableCredential.builder() + .prefix("gs") + .config( + ImmutableMap.of( + GCPProperties.GCS_OAUTH2_TOKEN, + "gcsToken1", + GCPProperties.GCS_OAUTH2_TOKEN_EXPIRES_AT, + Long.toString(Instant.now().plus(1, ChronoUnit.MINUTES).toEpochMilli()))) + .build(); + Credential credentialTwo = + ImmutableCredential.builder() + .prefix("gs://my-custom-prefix/xyz/long-prefix") + .config( + ImmutableMap.of( + GCPProperties.GCS_OAUTH2_TOKEN, + "gcsToken2", + GCPProperties.GCS_OAUTH2_TOKEN_EXPIRES_AT, + Long.toString(Instant.now().plus(2, ChronoUnit.MINUTES).toEpochMilli()))) + .build(); + Credential credentialThree = + ImmutableCredential.builder() + .prefix("gs://my-custom-prefix/xyz") + .config( + ImmutableMap.of( + GCPProperties.GCS_OAUTH2_TOKEN, + "gcsToken3", + GCPProperties.GCS_OAUTH2_TOKEN_EXPIRES_AT, + Long.toString(Instant.now().plus(3, ChronoUnit.MINUTES).toEpochMilli()))) + .build(); + HttpResponse mockResponse = + HttpResponse.response( + LoadCredentialsResponseParser.toJson( + ImmutableLoadCredentialsResponse.builder() + .addCredentials(credentialOne, credentialTwo, credentialThree) + .build())) + .withStatusCode(200); + mockServer.when(mockRequest).respond(mockResponse); + + OAuth2RefreshCredentialsHandler handler = + OAuth2RefreshCredentialsHandler.create( + ImmutableMap.of(GCPProperties.GCS_OAUTH2_REFRESH_CREDENTIALS_ENDPOINT, URI)); + + assertThatThrownBy(handler::refreshAccessToken) + .isInstanceOf(IllegalStateException.class) + .hasMessage("Invalid GCS Credentials: only one GCS credential should exist"); + } +}