Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.iceberg.rest.HTTPClient;
import org.apache.iceberg.rest.RESTClient;
import org.apache.iceberg.rest.ResourcePaths;
import org.apache.iceberg.rest.auth.AuthConfig;
import org.apache.iceberg.rest.auth.OAuth2Properties;
import org.apache.iceberg.rest.auth.OAuth2Util;
import org.apache.iceberg.rest.auth.OAuth2Util.AuthSession;
Expand Down Expand Up @@ -213,12 +214,13 @@ private AuthSession authSession() {
expiresAtMillis(properties()),
new AuthSession(
ImmutableMap.of(),
token,
null,
credential(),
SCOPE,
oauth2ServerUri(),
optionalOAuthParams())));
AuthConfig.builder()
.token(token)
.credential(credential())
.scope(SCOPE)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd consider SCOPE as one of the optional params. It is optional in both token exchange flow and client credential flow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes SCOPE is optional, but the signer client uses a custom scope, which we need to provide here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can get the scope from the optional OAuth parameters for signer. Also a rest catalog isn't necessary with signer. But I'm OK with either one.

.oauth2ServerUri(oauth2ServerUri())
.optionalOAuthParams(optionalOAuthParams())
.build())));
}

if (credentialProvided()) {
Expand All @@ -229,12 +231,12 @@ private AuthSession authSession() {
AuthSession session =
new AuthSession(
ImmutableMap.of(),
null,
null,
credential(),
SCOPE,
oauth2ServerUri(),
optionalOAuthParams());
AuthConfig.builder()
.credential(credential())
.scope(SCOPE)
.oauth2ServerUri(oauth2ServerUri())
.optionalOAuthParams(optionalOAuthParams())
.build());
long startTimeMillis = System.currentTimeMillis();
OAuthTokenResponse authResponse =
OAuth2Util.fetchToken(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.rest.auth.AuthConfig;
import org.apache.iceberg.rest.auth.OAuth2Properties;
import org.apache.iceberg.rest.auth.OAuth2Util;
import org.apache.iceberg.rest.auth.OAuth2Util.AuthSession;
Expand Down Expand Up @@ -219,7 +220,13 @@ public void initialize(String name, Map<String, String> unresolved) {
String token = mergedProps.get(OAuth2Properties.TOKEN);
this.catalogAuth =
new AuthSession(
baseHeaders, null, null, credential, scope, oauth2ServerUri, optionalOAuthParams);
baseHeaders,
AuthConfig.builder()
.credential(credential)
.scope(scope)
.oauth2ServerUri(oauth2ServerUri)
.optionalOAuthParams(optionalOAuthParams)
.build());
if (authResponse != null) {
this.catalogAuth =
AuthSession.fromTokenResponse(
Expand Down
72 changes: 72 additions & 0 deletions core/src/main/java/org/apache/iceberg/rest/auth/AuthConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.rest.auth;

import java.util.Map;
import javax.annotation.Nullable;
import org.apache.iceberg.rest.ResourcePaths;
import org.immutables.value.Value;

/**
* The purpose of this class is to hold configuration options for {@link
* org.apache.iceberg.rest.auth.OAuth2Util.AuthSession}.
*/
@Value.Style(redactedMask = "****")
@SuppressWarnings("ImmutablesStyle")
@Value.Immutable
public interface AuthConfig {
@Nullable
@Value.Redacted
String token();

@Nullable
String tokenType();

@Nullable
@Value.Redacted
String credential();

@Value.Default
default String scope() {
return OAuth2Properties.CATALOG_SCOPE;
}

@Value.Lazy
@Nullable
default Long expiresAtMillis() {
return OAuth2Util.expiresAtMillis(token());
}

@Value.Default
default boolean keepRefreshed() {
return true;
}

@Nullable
@Value.Default
default String oauth2ServerUri() {
return ResourcePaths.tokens();
}

Map<String, String> optionalOAuthParams();

static ImmutableAuthConfig.Builder builder() {
return ImmutableAuthConfig.builder();
}
}
151 changes: 72 additions & 79 deletions core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Util.java
Original file line number Diff line number Diff line change
Expand Up @@ -458,32 +458,11 @@ public static class AuthSession {
private static final long MAX_REFRESH_WINDOW_MILLIS = 300_000; // 5 minutes
private static final long MIN_REFRESH_WAIT_MILLIS = 10;
private volatile Map<String, String> headers;
private volatile String token;
private volatile String tokenType;
private volatile Long expiresAtMillis;
private final String credential;
private final String scope;
private volatile boolean keepRefreshed = true;
private final String oauth2ServerUri;
private volatile AuthConfig config;

private Map<String, String> optionalOAuthParams = ImmutableMap.of();

public AuthSession(
Map<String, String> baseHeaders,
String token,
String tokenType,
String credential,
String scope,
String oauth2ServerUri,
Map<String, String> optionalOAuthParams) {
this.headers = RESTUtil.merge(baseHeaders, authHeaders(token));
this.token = token;
this.tokenType = tokenType;
this.expiresAtMillis = OAuth2Util.expiresAtMillis(token);
this.credential = credential;
this.scope = scope;
this.oauth2ServerUri = oauth2ServerUri;
this.optionalOAuthParams = optionalOAuthParams;
public AuthSession(Map<String, String> baseHeaders, AuthConfig config) {
this.headers = RESTUtil.merge(baseHeaders, authHeaders(config.token()));
this.config = config;
Comment on lines -471 to +465
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it's public, unfortunately I don't think we can remove this constructor just yet, I think we'll have to deprecate like the other constructors.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@amogh-jahagirdar that constructor was introduced with 5f655a3 which didn't make it into any release and only exists on main

}

/** @deprecated since 1.5.0, will be removed in 1.6.0 */
Expand All @@ -494,13 +473,14 @@ public AuthSession(
String tokenType,
String credential,
String scope) {
this.headers = RESTUtil.merge(baseHeaders, authHeaders(token));
this.token = token;
this.tokenType = tokenType;
this.expiresAtMillis = OAuth2Util.expiresAtMillis(token);
this.credential = credential;
this.scope = scope;
this.oauth2ServerUri = ResourcePaths.tokens();
this(
baseHeaders,
AuthConfig.builder()
.token(token)
.tokenType(tokenType)
.credential(credential)
.scope(scope)
.build());
}

/** @deprecated since 1.6.0, will be removed in 1.7.0 */
Expand All @@ -512,50 +492,55 @@ public AuthSession(
String credential,
String scope,
String oauth2ServerUri) {
this.headers = RESTUtil.merge(baseHeaders, authHeaders(token));
this.token = token;
this.tokenType = tokenType;
this.expiresAtMillis = OAuth2Util.expiresAtMillis(token);
this.credential = credential;
this.scope = scope;
this.oauth2ServerUri = oauth2ServerUri;
this.optionalOAuthParams = ImmutableMap.of();
this(
baseHeaders,
AuthConfig.builder()
.token(token)
.tokenType(tokenType)
.credential(credential)
.scope(scope)
.oauth2ServerUri(oauth2ServerUri)
.build());
}

public Map<String, String> headers() {
return headers;
}

public String token() {
return token;
return config.token();
}

public String tokenType() {
return tokenType;
return config.tokenType();
}

public Long expiresAtMillis() {
return expiresAtMillis;
return config.expiresAtMillis();
}

public String scope() {
return scope;
return config.scope();
}

public void stopRefreshing() {
this.keepRefreshed = false;
public synchronized void stopRefreshing() {
this.config = ImmutableAuthConfig.copyOf(config).withKeepRefreshed(false);
}

public String credential() {
return credential;
return config.credential();
}

public String oauth2ServerUri() {
return oauth2ServerUri;
return config.oauth2ServerUri();
}

public Map<String, String> optionalOAuthParams() {
return optionalOAuthParams;
return config.optionalOAuthParams();
}

public AuthConfig config() {
return config;
}

@VisibleForTesting
Expand All @@ -569,14 +554,7 @@ static void setTokenRefreshNumRetries(int retries) {
* @return A new {@link AuthSession} with empty headers.
*/
public static AuthSession empty() {
return new AuthSession(
ImmutableMap.of(),
null,
null,
null,
OAuth2Properties.CATALOG_SCOPE,
null,
ImmutableMap.of());
return new AuthSession(ImmutableMap.of(), AuthConfig.builder().build());
}

/**
Expand All @@ -586,7 +564,7 @@ public static AuthSession empty() {
* @return interval to wait before calling refresh again, or null if no refresh is needed
*/
public Pair<Integer, TimeUnit> refresh(RESTClient client) {
if (token != null && keepRefreshed) {
if (token() != null && config.keepRefreshed()) {
AtomicReference<OAuthTokenResponse> ref = new AtomicReference<>(null);
boolean isSuccessful =
Tasks.foreach(ref)
Expand All @@ -612,10 +590,13 @@ public Pair<Integer, TimeUnit> refresh(RESTClient client) {
}

OAuthTokenResponse response = ref.get();
this.token = response.token();
this.tokenType = response.issuedTokenType();
this.expiresAtMillis = OAuth2Util.expiresAtMillis(token);
this.headers = RESTUtil.merge(headers, authHeaders(token));
this.config =
AuthConfig.builder()
.from(config())
.token(response.token())
.tokenType(response.issuedTokenType())
.build();
this.headers = RESTUtil.merge(headers, authHeaders(config.token()));

if (response.expiresInSeconds() != null) {
return Pair.of(response.expiresInSeconds(), TimeUnit.SECONDS);
Expand All @@ -626,21 +607,34 @@ public Pair<Integer, TimeUnit> refresh(RESTClient client) {
}

private OAuthTokenResponse refreshCurrentToken(RESTClient client) {
if (null != expiresAtMillis && expiresAtMillis <= System.currentTimeMillis()) {
if (null != expiresAtMillis() && expiresAtMillis() <= System.currentTimeMillis()) {
// the token has already expired, attempt to refresh using the credential
return refreshExpiredToken(client);
} else {
// attempt a normal refresh
return refreshToken(
client, headers(), token, tokenType, scope, oauth2ServerUri, optionalOAuthParams);
client,
headers(),
token(),
tokenType(),
scope(),
oauth2ServerUri(),
optionalOAuthParams());
}
}

private OAuthTokenResponse refreshExpiredToken(RESTClient client) {
if (credential != null) {
Map<String, String> basicHeaders = RESTUtil.merge(headers(), basicAuthHeaders(credential));
if (credential() != null) {
Map<String, String> basicHeaders =
RESTUtil.merge(headers(), basicAuthHeaders(credential()));
return refreshToken(
client, basicHeaders, token, tokenType, scope, oauth2ServerUri, optionalOAuthParams);
client,
basicHeaders,
token(),
tokenType(),
scope(),
oauth2ServerUri(),
optionalOAuthParams());
}

return null;
Expand Down Expand Up @@ -693,12 +687,11 @@ public static AuthSession fromAccessToken(
AuthSession session =
new AuthSession(
parent.headers(),
token,
OAuth2Properties.ACCESS_TOKEN_TYPE,
parent.credential(),
parent.scope(),
parent.oauth2ServerUri(),
parent.optionalOAuthParams());
AuthConfig.builder()
.from(parent.config())
.token(token)
.tokenType(OAuth2Properties.ACCESS_TOKEN_TYPE)
.build());

long startTimeMillis = System.currentTimeMillis();
Long expiresAtMillis = session.expiresAtMillis();
Expand Down Expand Up @@ -766,12 +759,12 @@ private static AuthSession fromTokenResponse(
AuthSession session =
new AuthSession(
parent.headers(),
response.token(),
response.issuedTokenType(),
credential,
parent.scope(),
parent.oauth2ServerUri(),
parent.optionalOAuthParams());
AuthConfig.builder()
.from(parent.config())
.token(response.token())
.tokenType(response.issuedTokenType())
.credential(credential)
.build());

Long expiresAtMillis = session.expiresAtMillis();
if (null == expiresAtMillis && response.expiresInSeconds() != null) {
Expand Down