Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ coverage.xml

# vscode/eclipse files
.classpath
.factorypath
.project
.settings
bin/
Expand Down
4 changes: 4 additions & 0 deletions .palantir/revapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1235,6 +1235,10 @@ acceptedBreaks:
old: "method void org.apache.iceberg.data.parquet.BaseParquetWriter<T>::<init>()"
new: "method void org.apache.iceberg.data.parquet.BaseParquetWriter<T>::<init>()"
justification: "Changing deprecated code"
- code: "java.method.addedToInterface"
new: "method java.lang.String org.apache.iceberg.rest.auth.AuthConfig::refreshToken()"
justification: "Adding an additional refresh token config to support the refresh\
\ token OAuth flow."
apache-iceberg-0.14.0:
org.apache.iceberg:iceberg-api:
- code: "java.class.defaultSerializationChanged"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,17 +227,19 @@ public void initialize(String name, Map<String, String> unresolved) {
OAuthTokenResponse authResponse;
String credential = props.get(OAuth2Properties.CREDENTIAL);
boolean hasCredential = credential != null && !credential.isEmpty();
String refreshToken = props.get(OAuth2Properties.REFRESH_TOKEN);
boolean hasRefreshToken = refreshToken != null;
String scope = props.getOrDefault(OAuth2Properties.SCOPE, OAuth2Properties.CATALOG_SCOPE);
Map<String, String> optionalOAuthParams = OAuth2Util.buildOptionalParam(props);
if (!props.containsKey(OAuth2Properties.OAUTH2_SERVER_URI)
&& (hasInitToken || hasCredential)
&& (hasInitToken || hasCredential || hasRefreshToken)
&& !PropertyUtil.propertyAsBoolean(props, "rest.sigv4-enabled", false)) {
LOG.warn(
"Iceberg REST client is missing the OAuth2 server URI configuration and defaults to {}/{}. "
+ "This automatic fallback will be removed in a future Iceberg release."
+ "It is recommended to configure the OAuth2 endpoint using the '{}' property to be prepared. "
+ "This warning will disappear if the OAuth2 endpoint is explicitly configured. "
+ "See https://github.com/apache/iceberg/issues/10537",
"Iceberg REST client is missing the OAuth2 server URI configuration and defaults to"
+ " {}/{}. This automatic fallback will be removed in a future Iceberg release.It is"
+ " recommended to configure the OAuth2 endpoint using the '{}' property to be"
+ " prepared. This warning will disappear if the OAuth2 endpoint is explicitly"
+ " configured. See https://github.com/apache/iceberg/issues/10537",
RESTUtil.stripTrailingSlash(props.get(CatalogProperties.URI)),
ResourcePaths.tokens(),
OAuth2Properties.OAUTH2_SERVER_URI);
Expand Down Expand Up @@ -294,6 +296,7 @@ public void initialize(String name, Map<String, String> unresolved) {
baseHeaders,
AuthConfig.builder()
.credential(credential)
.refreshToken(refreshToken)
.scope(scope)
.oauth2ServerUri(oauth2ServerUri)
.optionalOAuthParams(optionalOAuthParams)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ public interface AuthConfig {
@Value.Redacted
String token();

@Nullable
@Value.Redacted
String refreshToken();

@Nullable
String tokenType();

Expand Down Expand Up @@ -75,6 +79,7 @@ static AuthConfig fromProperties(Map<String, String> properties) {
return builder()
.credential(properties.get(OAuth2Properties.CREDENTIAL))
.token(properties.get(OAuth2Properties.TOKEN))
.refreshToken(properties.get(OAuth2Properties.REFRESH_TOKEN))
.scope(properties.getOrDefault(OAuth2Properties.SCOPE, OAuth2Properties.CATALOG_SCOPE))
.oauth2ServerUri(
properties.getOrDefault(OAuth2Properties.OAUTH2_SERVER_URI, ResourcePaths.tokens()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ private OAuth2Properties() {}
/** A Bearer token which will be used for interaction with the server. */
public static final String TOKEN = "token";

/** A refresh token which could be used to refresh access tokens. */
public static final String REFRESH_TOKEN = "refresh-token";

/** A credential to exchange for a token in the OAuth2 client credentials flow. */
public static final String CREDENTIAL = "credential";

Expand Down
76 changes: 63 additions & 13 deletions core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Util.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ private OAuth2Util() {}

private static final Logger LOG = LoggerFactory.getLogger(OAuth2Util.class);

// valid scope tokens are from ascii 0x21 to 0x7E, excluding 0x22 (") and 0x5C (\)
// valid scope tokens are from ascii 0x21 to 0x7E, excluding 0x22 (") and 0x5C
// (\)
private static final Pattern VALID_SCOPE_TOKEN = Pattern.compile("^[!-~&&[^\"\\\\]]+$");
private static final Splitter SCOPE_DELIMITER = Splitter.on(" ");
private static final Joiner SCOPE_JOINER = Joiner.on(" ");
Expand All @@ -80,6 +81,15 @@ private OAuth2Util() {}
private static final String CLIENT_ID = "client_id";
private static final String CLIENT_SECRET = "client_secret";

// Refresh token flow (https://datatracker.ietf.org/doc/html/rfc6749#section-6)
// The string refresh_token/refreshToken is overloaded:
// - The refresh token as a method is named refreshTokenFlow.
// - The auth config property is named/keyed refresh-token.
// - Everywhere else:
// - when used as a plain text value, it means the grant type.
// - when used as class properties, used to retrieve the actual token.
private static final String REFRESH_TOKEN = "refresh_token";

// Token exchange flow
private static final String SUBJECT_TOKEN = "subject_token";
private static final String SUBJECT_TOKEN_TYPE = "subject_token_type";
Expand Down Expand Up @@ -133,7 +143,8 @@ public static String toScope(Iterable<String> scopes) {

public static Map<String, String> buildOptionalParam(Map<String, String> properties) {
// these are some options oauth params based on specification
// for any new optional oauth param, define the constant and add the constant to this list
// for any new optional oauth param, define the constant and add the constant to
// this list
Set<String> optionalParamKeys =
ImmutableSet.of(OAuth2Properties.AUDIENCE, OAuth2Properties.RESOURCE);
ImmutableMap.Builder<String, String> optionalParamBuilder = ImmutableMap.builder();
Expand All @@ -151,7 +162,23 @@ public static Map<String, String> buildOptionalParam(Map<String, String> propert
return optionalParamBuilder.buildKeepingLast();
}

private static OAuthTokenResponse refreshToken(
private static OAuthTokenResponse refreshTokenFlow(
RESTClient client, Map<String, String> headers, String refreshToken, String oauth2ServerUri) {
Map<String, String> request = refreshTokenRequest(refreshToken);

OAuthTokenResponse response =
client.postForm(
oauth2ServerUri,
request,
OAuthTokenResponse.class,
headers,
ErrorHandlers.oauthErrorHandler());
response.validate();

return response;
}

private static OAuthTokenResponse exchangeToken(
RESTClient client,
Map<String, String> headers,
String subjectToken,
Expand Down Expand Up @@ -292,6 +319,16 @@ public static OAuthTokenResponse fetchToken(
return fetchToken(client, headers, credential, scope, oauth2ServerUri, ImmutableMap.of());
}

private static Map<String, String> refreshTokenRequest(String refreshToken) {
Preconditions.checkArgument(
refreshToken == null || !refreshToken.isEmpty(), "Refresh token provided but it is empty.");
ImmutableMap.Builder<String, String> formData = ImmutableMap.builder();
formData.put(GRANT_TYPE, REFRESH_TOKEN);
formData.put(REFRESH_TOKEN, refreshToken);

return formData.buildKeepingLast();
}

private static Map<String, String> tokenExchangeRequest(
String subjectToken,
String subjectTokenType,
Expand Down Expand Up @@ -482,6 +519,10 @@ public String token() {
return config.token();
}

public String refreshToken() {
return config.refreshToken();
}

public String tokenType() {
return config.tokenType();
}
Expand Down Expand Up @@ -587,31 +628,40 @@ private OAuthTokenResponse refreshCurrentToken(RESTClient client) {
if (null != expiresAtMillis() && expiresAtMillis() <= System.currentTimeMillis()) {
// the token has already expired, attempt to refresh using the credential
return refreshExpiredToken(client);
} else {
} else if (refreshToken() == null) {
// attempt a normal refresh
return refreshToken(
return exchangeToken(
client,
headers(),
token(),
tokenType(),
scope(),
oauth2ServerUri(),
optionalOAuthParams());
} else if (credential() != null) {
Map<String, String> basicHeaders =
RESTUtil.merge(headers(), basicAuthHeaders(credential()));
return refreshTokenFlow(client, basicHeaders, refreshToken(), oauth2ServerUri());
}
return null;
}

private OAuthTokenResponse refreshExpiredToken(RESTClient client) {
if (credential() != null) {
Map<String, String> basicHeaders =
RESTUtil.merge(headers(), basicAuthHeaders(credential()));
return refreshToken(
client,
basicHeaders,
token(),
tokenType(),
scope(),
oauth2ServerUri(),
optionalOAuthParams());
if (refreshToken() == null) {
return exchangeToken(
client,
basicHeaders,
token(),
tokenType(),
scope(),
oauth2ServerUri(),
optionalOAuthParams());
} else {
return refreshTokenFlow(client, basicHeaders, refreshToken(), oauth2ServerUri());
}
}

return null;
Expand Down
105 changes: 104 additions & 1 deletion core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -820,7 +820,8 @@ public void testTableIDToken(String oauth2ServerUri) {
ImmutableMap.of("Authorization", "Bearer token-exchange-token:sub=id-token,act=catalog"),
ImmutableMap.of(
"Authorization",
"Bearer token-exchange-token:sub=table-id-token,act=token-exchange-token:sub=id-token,act=catalog"),
"Bearer"
+ " token-exchange-token:sub=table-id-token,act=token-exchange-token:sub=id-token,act=catalog"),
oauth2ServerUri);
}

Expand Down Expand Up @@ -1390,6 +1391,108 @@ public void testCatalogTokenRefresh(String oauth2ServerUri) {
});
}

@ParameterizedTest
@ValueSource(strings = {"v1/oauth/tokens", "https://auth-server.com/token"})
public void testCatalogTokenRefreshByRefreshTokenFlow(String oauth2ServerUri) {
Map<String, String> emptyHeaders = ImmutableMap.of();
// Catalog headers are used to send requests to the Catalog REST endpoint.
Map<String, String> catalogHeaders =
ImmutableMap.of("Authorization", "Bearer client-credentials-token:sub=catalog");
// Basic headers are used to send requests to the oauth2 server enepoint.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had a typo here, will fix it with other changes based on comments.

Map<String, String> basicHeaders = OAuth2Util.basicAuthHeaders("catalog:secret");

RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog));

Answer<OAuthTokenResponse> addOneSecondExpiration =
invocation -> {
OAuthTokenResponse response = (OAuthTokenResponse) invocation.callRealMethod();
return OAuthTokenResponse.builder()
.withToken(response.token())
.withTokenType(response.tokenType())
.withIssuedTokenType(response.issuedTokenType())
.addScopes(response.scopes())
.setExpirationInSeconds(1)
.build();
};

Mockito.doAnswer(addOneSecondExpiration)
.when(adapter)
.postForm(eq(oauth2ServerUri), anyMap(), eq(OAuthTokenResponse.class), anyMap(), any());

Map<String, String> contextCredentials = ImmutableMap.of();
SessionCatalog.SessionContext context =
new SessionCatalog.SessionContext(
UUID.randomUUID().toString(), "user", contextCredentials, ImmutableMap.of());

RESTCatalog catalog = new RESTCatalog(context, (config) -> adapter);
catalog.initialize(
"prod",
ImmutableMap.of(
CatalogProperties.URI,
"ignored",
"credential",
"catalog:secret",
"refresh-token", // the property triggers the refresh token flow
"refreshToken",
OAuth2Properties.OAUTH2_SERVER_URI,
oauth2ServerUri));

Awaitility.await()
.atMost(5, TimeUnit.SECONDS)
.untilAsserted(
() -> {
// call client credentials with no initial auth
Mockito.verify(adapter)
.execute(
reqMatcher(HTTPMethod.POST, oauth2ServerUri, emptyHeaders),
eq(OAuthTokenResponse.class),
any(),
any());

// use the client credential token for config
Mockito.verify(adapter)
.execute(
reqMatcher(HTTPMethod.GET, "v1/config", catalogHeaders),
eq(ConfigResponse.class),
any(),
any());

// verify the first token refresh
Map<String, String> firstRefreshRequest =
ImmutableMap.of(
"grant_type", "refresh_token",
"refresh_token", "refreshToken");
Mockito.verify(adapter)
.execute(
reqMatcher(
HTTPMethod.POST,
oauth2ServerUri,
basicHeaders,
Map.of(),
firstRefreshRequest),
eq(OAuthTokenResponse.class),
any(),
any());

// verify that a second refresh occurs
Map<String, String> secondRefreshRequest =
ImmutableMap.of(
"grant_type", "refresh_token",
"refresh_token", "refreshToken");
Mockito.verify(adapter)
.execute(
reqMatcher(
HTTPMethod.POST,
oauth2ServerUri,
basicHeaders,
Map.of(),
secondRefreshRequest),
eq(OAuthTokenResponse.class),
any(),
any());
});
}

@ParameterizedTest
@ValueSource(strings = {"v1/oauth/tokens", "https://auth-server.com/token"})
public void testCatalogRefreshedTokenIsUsed(String oauth2ServerUri) {
Expand Down
Loading