diff --git a/.gitignore b/.gitignore index cbe36d543f9e..e830287c9472 100644 --- a/.gitignore +++ b/.gitignore @@ -54,6 +54,7 @@ coverage.xml # vscode/eclipse files .classpath +.factorypath .project .settings bin/ diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml index b15b2ce84b0f..c84b79a79506 100644 --- a/.palantir/revapi.yml +++ b/.palantir/revapi.yml @@ -1235,6 +1235,10 @@ acceptedBreaks: old: "method void org.apache.iceberg.data.parquet.BaseParquetWriter::()" new: "method void org.apache.iceberg.data.parquet.BaseParquetWriter::()" justification: "Changing deprecated code" + - code: "java.method.addedToInterface" + new: "method java.lang.String org.apache.iceberg.rest.auth.AuthConfig::refreshToken()" + justification: "Adding an additional refresh token config to support the refresh\ + \ token OAuth flow." apache-iceberg-0.14.0: org.apache.iceberg:iceberg-api: - code: "java.class.defaultSerializationChanged" diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java index 1cde2b8ad43c..9771d67d3312 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java @@ -227,17 +227,19 @@ public void initialize(String name, Map unresolved) { OAuthTokenResponse authResponse; String credential = props.get(OAuth2Properties.CREDENTIAL); boolean hasCredential = credential != null && !credential.isEmpty(); + String refreshToken = props.get(OAuth2Properties.REFRESH_TOKEN); + boolean hasRefreshToken = refreshToken != null; String scope = props.getOrDefault(OAuth2Properties.SCOPE, OAuth2Properties.CATALOG_SCOPE); Map optionalOAuthParams = OAuth2Util.buildOptionalParam(props); if (!props.containsKey(OAuth2Properties.OAUTH2_SERVER_URI) - && (hasInitToken || hasCredential) + && (hasInitToken || hasCredential || hasRefreshToken) && !PropertyUtil.propertyAsBoolean(props, "rest.sigv4-enabled", false)) { LOG.warn( - "Iceberg REST client is missing the OAuth2 server URI configuration and defaults to {}/{}. " - + "This automatic fallback will be removed in a future Iceberg release." - + "It is recommended to configure the OAuth2 endpoint using the '{}' property to be prepared. " - + "This warning will disappear if the OAuth2 endpoint is explicitly configured. " - + "See https://github.com/apache/iceberg/issues/10537", + "Iceberg REST client is missing the OAuth2 server URI configuration and defaults to" + + " {}/{}. This automatic fallback will be removed in a future Iceberg release.It is" + + " recommended to configure the OAuth2 endpoint using the '{}' property to be" + + " prepared. This warning will disappear if the OAuth2 endpoint is explicitly" + + " configured. See https://github.com/apache/iceberg/issues/10537", RESTUtil.stripTrailingSlash(props.get(CatalogProperties.URI)), ResourcePaths.tokens(), OAuth2Properties.OAUTH2_SERVER_URI); @@ -294,6 +296,7 @@ public void initialize(String name, Map unresolved) { baseHeaders, AuthConfig.builder() .credential(credential) + .refreshToken(refreshToken) .scope(scope) .oauth2ServerUri(oauth2ServerUri) .optionalOAuthParams(optionalOAuthParams) diff --git a/core/src/main/java/org/apache/iceberg/rest/auth/AuthConfig.java b/core/src/main/java/org/apache/iceberg/rest/auth/AuthConfig.java index 9103636a7d8a..1fd74d3da899 100644 --- a/core/src/main/java/org/apache/iceberg/rest/auth/AuthConfig.java +++ b/core/src/main/java/org/apache/iceberg/rest/auth/AuthConfig.java @@ -36,6 +36,10 @@ public interface AuthConfig { @Value.Redacted String token(); + @Nullable + @Value.Redacted + String refreshToken(); + @Nullable String tokenType(); @@ -75,6 +79,7 @@ static AuthConfig fromProperties(Map properties) { return builder() .credential(properties.get(OAuth2Properties.CREDENTIAL)) .token(properties.get(OAuth2Properties.TOKEN)) + .refreshToken(properties.get(OAuth2Properties.REFRESH_TOKEN)) .scope(properties.getOrDefault(OAuth2Properties.SCOPE, OAuth2Properties.CATALOG_SCOPE)) .oauth2ServerUri( properties.getOrDefault(OAuth2Properties.OAUTH2_SERVER_URI, ResourcePaths.tokens())) diff --git a/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Properties.java b/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Properties.java index 295e24519129..a48850c8e622 100644 --- a/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Properties.java +++ b/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Properties.java @@ -24,6 +24,9 @@ private OAuth2Properties() {} /** A Bearer token which will be used for interaction with the server. */ public static final String TOKEN = "token"; + /** A refresh token which could be used to refresh access tokens. */ + public static final String REFRESH_TOKEN = "refresh-token"; + /** A credential to exchange for a token in the OAuth2 client credentials flow. */ public static final String CREDENTIAL = "credential"; diff --git a/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Util.java b/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Util.java index 2bcf592d2aab..ebe91e632ea7 100644 --- a/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Util.java +++ b/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Util.java @@ -61,7 +61,8 @@ private OAuth2Util() {} private static final Logger LOG = LoggerFactory.getLogger(OAuth2Util.class); - // valid scope tokens are from ascii 0x21 to 0x7E, excluding 0x22 (") and 0x5C (\) + // valid scope tokens are from ascii 0x21 to 0x7E, excluding 0x22 (") and 0x5C + // (\) private static final Pattern VALID_SCOPE_TOKEN = Pattern.compile("^[!-~&&[^\"\\\\]]+$"); private static final Splitter SCOPE_DELIMITER = Splitter.on(" "); private static final Joiner SCOPE_JOINER = Joiner.on(" "); @@ -80,6 +81,15 @@ private OAuth2Util() {} private static final String CLIENT_ID = "client_id"; private static final String CLIENT_SECRET = "client_secret"; + // Refresh token flow (https://datatracker.ietf.org/doc/html/rfc6749#section-6) + // The string refresh_token/refreshToken is overloaded: + // - The refresh token as a method is named refreshTokenFlow. + // - The auth config property is named/keyed refresh-token. + // - Everywhere else: + // - when used as a plain text value, it means the grant type. + // - when used as class properties, used to retrieve the actual token. + private static final String REFRESH_TOKEN = "refresh_token"; + // Token exchange flow private static final String SUBJECT_TOKEN = "subject_token"; private static final String SUBJECT_TOKEN_TYPE = "subject_token_type"; @@ -133,7 +143,8 @@ public static String toScope(Iterable scopes) { public static Map buildOptionalParam(Map properties) { // these are some options oauth params based on specification - // for any new optional oauth param, define the constant and add the constant to this list + // for any new optional oauth param, define the constant and add the constant to + // this list Set optionalParamKeys = ImmutableSet.of(OAuth2Properties.AUDIENCE, OAuth2Properties.RESOURCE); ImmutableMap.Builder optionalParamBuilder = ImmutableMap.builder(); @@ -151,7 +162,23 @@ public static Map buildOptionalParam(Map propert return optionalParamBuilder.buildKeepingLast(); } - private static OAuthTokenResponse refreshToken( + private static OAuthTokenResponse refreshTokenFlow( + RESTClient client, Map headers, String refreshToken, String oauth2ServerUri) { + Map request = refreshTokenRequest(refreshToken); + + OAuthTokenResponse response = + client.postForm( + oauth2ServerUri, + request, + OAuthTokenResponse.class, + headers, + ErrorHandlers.oauthErrorHandler()); + response.validate(); + + return response; + } + + private static OAuthTokenResponse exchangeToken( RESTClient client, Map headers, String subjectToken, @@ -292,6 +319,16 @@ public static OAuthTokenResponse fetchToken( return fetchToken(client, headers, credential, scope, oauth2ServerUri, ImmutableMap.of()); } + private static Map refreshTokenRequest(String refreshToken) { + Preconditions.checkArgument( + refreshToken == null || !refreshToken.isEmpty(), "Refresh token provided but it is empty."); + ImmutableMap.Builder formData = ImmutableMap.builder(); + formData.put(GRANT_TYPE, REFRESH_TOKEN); + formData.put(REFRESH_TOKEN, refreshToken); + + return formData.buildKeepingLast(); + } + private static Map tokenExchangeRequest( String subjectToken, String subjectTokenType, @@ -482,6 +519,10 @@ public String token() { return config.token(); } + public String refreshToken() { + return config.refreshToken(); + } + public String tokenType() { return config.tokenType(); } @@ -587,9 +628,9 @@ private OAuthTokenResponse refreshCurrentToken(RESTClient client) { if (null != expiresAtMillis() && expiresAtMillis() <= System.currentTimeMillis()) { // the token has already expired, attempt to refresh using the credential return refreshExpiredToken(client); - } else { + } else if (refreshToken() == null) { // attempt a normal refresh - return refreshToken( + return exchangeToken( client, headers(), token(), @@ -597,21 +638,30 @@ private OAuthTokenResponse refreshCurrentToken(RESTClient client) { scope(), oauth2ServerUri(), optionalOAuthParams()); + } else if (credential() != null) { + Map basicHeaders = + RESTUtil.merge(headers(), basicAuthHeaders(credential())); + return refreshTokenFlow(client, basicHeaders, refreshToken(), oauth2ServerUri()); } + return null; } private OAuthTokenResponse refreshExpiredToken(RESTClient client) { if (credential() != null) { Map basicHeaders = RESTUtil.merge(headers(), basicAuthHeaders(credential())); - return refreshToken( - client, - basicHeaders, - token(), - tokenType(), - scope(), - oauth2ServerUri(), - optionalOAuthParams()); + if (refreshToken() == null) { + return exchangeToken( + client, + basicHeaders, + token(), + tokenType(), + scope(), + oauth2ServerUri(), + optionalOAuthParams()); + } else { + return refreshTokenFlow(client, basicHeaders, refreshToken(), oauth2ServerUri()); + } } return null; diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java index 696240bb6da2..487c8a21a9f8 100644 --- a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java +++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java @@ -820,7 +820,8 @@ public void testTableIDToken(String oauth2ServerUri) { ImmutableMap.of("Authorization", "Bearer token-exchange-token:sub=id-token,act=catalog"), ImmutableMap.of( "Authorization", - "Bearer token-exchange-token:sub=table-id-token,act=token-exchange-token:sub=id-token,act=catalog"), + "Bearer" + + " token-exchange-token:sub=table-id-token,act=token-exchange-token:sub=id-token,act=catalog"), oauth2ServerUri); } @@ -1390,6 +1391,108 @@ public void testCatalogTokenRefresh(String oauth2ServerUri) { }); } + @ParameterizedTest + @ValueSource(strings = {"v1/oauth/tokens", "https://auth-server.com/token"}) + public void testCatalogTokenRefreshByRefreshTokenFlow(String oauth2ServerUri) { + Map emptyHeaders = ImmutableMap.of(); + // Catalog headers are used to send requests to the Catalog REST endpoint. + Map catalogHeaders = + ImmutableMap.of("Authorization", "Bearer client-credentials-token:sub=catalog"); + // Basic headers are used to send requests to the oauth2 server enepoint. + Map basicHeaders = OAuth2Util.basicAuthHeaders("catalog:secret"); + + RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog)); + + Answer addOneSecondExpiration = + invocation -> { + OAuthTokenResponse response = (OAuthTokenResponse) invocation.callRealMethod(); + return OAuthTokenResponse.builder() + .withToken(response.token()) + .withTokenType(response.tokenType()) + .withIssuedTokenType(response.issuedTokenType()) + .addScopes(response.scopes()) + .setExpirationInSeconds(1) + .build(); + }; + + Mockito.doAnswer(addOneSecondExpiration) + .when(adapter) + .postForm(eq(oauth2ServerUri), anyMap(), eq(OAuthTokenResponse.class), anyMap(), any()); + + Map contextCredentials = ImmutableMap.of(); + SessionCatalog.SessionContext context = + new SessionCatalog.SessionContext( + UUID.randomUUID().toString(), "user", contextCredentials, ImmutableMap.of()); + + RESTCatalog catalog = new RESTCatalog(context, (config) -> adapter); + catalog.initialize( + "prod", + ImmutableMap.of( + CatalogProperties.URI, + "ignored", + "credential", + "catalog:secret", + "refresh-token", // the property triggers the refresh token flow + "refreshToken", + OAuth2Properties.OAUTH2_SERVER_URI, + oauth2ServerUri)); + + Awaitility.await() + .atMost(5, TimeUnit.SECONDS) + .untilAsserted( + () -> { + // call client credentials with no initial auth + Mockito.verify(adapter) + .execute( + reqMatcher(HTTPMethod.POST, oauth2ServerUri, emptyHeaders), + eq(OAuthTokenResponse.class), + any(), + any()); + + // use the client credential token for config + Mockito.verify(adapter) + .execute( + reqMatcher(HTTPMethod.GET, "v1/config", catalogHeaders), + eq(ConfigResponse.class), + any(), + any()); + + // verify the first token refresh + Map firstRefreshRequest = + ImmutableMap.of( + "grant_type", "refresh_token", + "refresh_token", "refreshToken"); + Mockito.verify(adapter) + .execute( + reqMatcher( + HTTPMethod.POST, + oauth2ServerUri, + basicHeaders, + Map.of(), + firstRefreshRequest), + eq(OAuthTokenResponse.class), + any(), + any()); + + // verify that a second refresh occurs + Map secondRefreshRequest = + ImmutableMap.of( + "grant_type", "refresh_token", + "refresh_token", "refreshToken"); + Mockito.verify(adapter) + .execute( + reqMatcher( + HTTPMethod.POST, + oauth2ServerUri, + basicHeaders, + Map.of(), + secondRefreshRequest), + eq(OAuthTokenResponse.class), + any(), + any()); + }); + } + @ParameterizedTest @ValueSource(strings = {"v1/oauth/tokens", "https://auth-server.com/token"}) public void testCatalogRefreshedTokenIsUsed(String oauth2ServerUri) {