diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java index 11a6aa6f27da..8a81a1a161aa 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java @@ -29,9 +29,6 @@ import java.util.Locale; import java.util.Map; import java.util.Set; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.Supplier; @@ -70,10 +67,9 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.rest.auth.AuthConfig; -import org.apache.iceberg.rest.auth.OAuth2Properties; -import org.apache.iceberg.rest.auth.OAuth2Util; -import org.apache.iceberg.rest.auth.OAuth2Util.AuthSession; +import org.apache.iceberg.rest.auth.AuthManager; +import org.apache.iceberg.rest.auth.AuthManagers; +import org.apache.iceberg.rest.auth.AuthSession; import org.apache.iceberg.rest.requests.CommitTransactionRequest; import org.apache.iceberg.rest.requests.CreateNamespaceRequest; import org.apache.iceberg.rest.requests.CreateTableRequest; @@ -91,12 +87,10 @@ import org.apache.iceberg.rest.responses.ListTablesResponse; import org.apache.iceberg.rest.responses.LoadTableResponse; import org.apache.iceberg.rest.responses.LoadViewResponse; -import org.apache.iceberg.rest.responses.OAuthTokenResponse; import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse; import org.apache.iceberg.util.EnvironmentUtil; import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.PropertyUtil; -import org.apache.iceberg.util.ThreadPools; import org.apache.iceberg.view.BaseView; import org.apache.iceberg.view.ImmutableSQLViewRepresentation; import org.apache.iceberg.view.ImmutableViewVersion; @@ -116,21 +110,13 @@ public class RESTSessionCatalog extends BaseViewSessionCatalog private static final String REST_METRICS_REPORTING_ENABLED = "rest-metrics-reporting-enabled"; private static final String REST_SNAPSHOT_LOADING_MODE = "snapshot-loading-mode"; public static final String REST_PAGE_SIZE = "rest-page-size"; - private static final List TOKEN_PREFERENCE_ORDER = - ImmutableList.of( - OAuth2Properties.ID_TOKEN_TYPE, - OAuth2Properties.ACCESS_TOKEN_TYPE, - OAuth2Properties.JWT_TOKEN_TYPE, - OAuth2Properties.SAML2_TOKEN_TYPE, - OAuth2Properties.SAML1_TOKEN_TYPE); - private final Function, RESTClient> clientBuilder; private final BiFunction, FileIO> ioBuilder; private Cache sessions = null; private Cache tableSessions = null; private Cache fileIOCloser; private AuthSession catalogAuth = null; - private boolean keepTokenRefreshed = true; + private AuthManager authManager; private RESTClient client = null; private ResourcePaths paths = null; private SnapshotMode snapshotMode = null; @@ -141,9 +127,6 @@ public class RESTSessionCatalog extends BaseViewSessionCatalog private Integer pageSize = null; private CloseableGroup closeables = null; - // a lazy thread pool for token refresh - private volatile ScheduledExecutorService refreshExecutor = null; - enum SnapshotMode { ALL, REFS; @@ -172,33 +155,14 @@ public void initialize(String name, Map unresolved) { // note that this is only done for local config properties and not for properties from the // catalog service Map props = EnvironmentUtil.resolveAll(unresolved); + Map configuredHeaders = configHeaders(props); + this.authManager = AuthManagers.loadAuthManager(name, props); - long startTimeMillis = - System.currentTimeMillis(); // keep track of the init start time for token refresh - String initToken = props.get(OAuth2Properties.TOKEN); - - // fetch auth and config to complete initialization ConfigResponse config; - OAuthTokenResponse authResponse; - String credential = props.get(OAuth2Properties.CREDENTIAL); - String scope = props.getOrDefault(OAuth2Properties.SCOPE, OAuth2Properties.CATALOG_SCOPE); - Map optionalOAuthParams = OAuth2Util.buildOptionalParam(props); - String oauth2ServerUri = - props.getOrDefault(OAuth2Properties.OAUTH2_SERVER_URI, ResourcePaths.tokens()); try (RESTClient initClient = clientBuilder.apply(props)) { - Map initHeaders = - RESTUtil.merge(configHeaders(props), OAuth2Util.authHeaders(initToken)); - if (credential != null && !credential.isEmpty()) { - authResponse = - OAuth2Util.fetchToken( - initClient, initHeaders, credential, scope, oauth2ServerUri, optionalOAuthParams); - Map authHeaders = - RESTUtil.merge(initHeaders, OAuth2Util.authHeaders(authResponse.token())); - config = fetchConfig(initClient, authHeaders, props); - } else { - authResponse = null; - config = fetchConfig(initClient, initHeaders, props); - } + Map authAndConfiguredHeaders = + authManager.mergeAuthHeadersForGetConfig(initClient, configuredHeaders); + config = fetchConfig(initClient, authAndConfiguredHeaders, props); } catch (IOException e) { throw new UncheckedIOException("Failed to close HTTP client", e); } @@ -209,33 +173,9 @@ public void initialize(String name, Map unresolved) { this.sessions = newSessionCache(mergedProps); this.tableSessions = newSessionCache(mergedProps); - this.keepTokenRefreshed = - PropertyUtil.propertyAsBoolean( - mergedProps, - OAuth2Properties.TOKEN_REFRESH_ENABLED, - OAuth2Properties.TOKEN_REFRESH_ENABLED_DEFAULT); this.client = clientBuilder.apply(mergedProps); this.paths = ResourcePaths.forCatalogProperties(mergedProps); - - String token = mergedProps.get(OAuth2Properties.TOKEN); - this.catalogAuth = - new AuthSession( - baseHeaders, - AuthConfig.builder() - .credential(credential) - .scope(scope) - .oauth2ServerUri(oauth2ServerUri) - .optionalOAuthParams(optionalOAuthParams) - .build()); - if (authResponse != null) { - this.catalogAuth = - AuthSession.fromTokenResponse( - client, tokenRefreshExecutor(name), authResponse, startTimeMillis, catalogAuth); - } else if (token != null) { - this.catalogAuth = - AuthSession.fromAccessToken( - client, tokenRefreshExecutor(name), token, expiresAtMillis(mergedProps), catalogAuth); - } + this.catalogAuth = authManager.newSession(client, mergedProps, baseHeaders); this.pageSize = PropertyUtil.propertyAsNullableInt(mergedProps, REST_PAGE_SIZE); if (pageSize != null) { @@ -249,6 +189,7 @@ public void initialize(String name, Map unresolved) { this.closeables = new CloseableGroup(); this.closeables.addCloseable(this.io); this.closeables.addCloseable(this.client); + this.closeables.addCloseable(this.authManager); this.closeables.setSuppressCloseFailure(true); this.snapshotMode = @@ -270,7 +211,8 @@ private AuthSession session(SessionContext context) { context.sessionId(), id -> { Pair> newSession = - newSession(context.credentials(), context.properties(), catalogAuth); + authManager.newSessionSupplier( + context.credentials(), context.properties(), catalogAuth); if (null != newSession) { return newSession.second().get(); } @@ -595,26 +537,8 @@ public boolean updateNamespaceMetadata( return !response.updated().isEmpty(); } - private ScheduledExecutorService tokenRefreshExecutor(String catalogName) { - if (!keepTokenRefreshed) { - return null; - } - - if (refreshExecutor == null) { - synchronized (this) { - if (refreshExecutor == null) { - this.refreshExecutor = ThreadPools.newScheduledPool(catalogName + "-token-refresh", 1); - } - } - } - - return refreshExecutor; - } - @Override public void close() throws IOException { - shutdownRefreshExecutor(); - if (closeables != null) { closeables.close(); } @@ -625,30 +549,6 @@ public void close() throws IOException { } } - private void shutdownRefreshExecutor() { - if (refreshExecutor != null) { - ScheduledExecutorService service = refreshExecutor; - this.refreshExecutor = null; - - List tasks = service.shutdownNow(); - tasks.forEach( - task -> { - if (task instanceof Future) { - ((Future) task).cancel(true); - } - }); - - try { - if (!service.awaitTermination(1, TimeUnit.MINUTES)) { - LOG.warn("Timed out waiting for refresh executor to terminate"); - } - } catch (InterruptedException e) { - LOG.warn("Interrupted while waiting for refresh executor to terminate", e); - Thread.currentThread().interrupt(); - } - } - } - private class Builder implements Catalog.TableBuilder { private final TableIdentifier ident; private final Schema schema; @@ -922,7 +822,8 @@ private FileIO tableFileIO(SessionContext context, Map config) { } private AuthSession tableSession(Map tableConf, AuthSession parent) { - Pair> newSession = newSession(tableConf, tableConf, parent); + Pair> newSession = + authManager.newSessionSupplier(tableConf, tableConf, parent); if (null == newSession) { return parent; } @@ -957,66 +858,6 @@ private static ConfigResponse fetchConfig( return configResponse; } - private Pair> newSession( - Map credentials, Map properties, AuthSession parent) { - if (credentials != null) { - // use the bearer token without exchanging - if (credentials.containsKey(OAuth2Properties.TOKEN)) { - return Pair.of( - credentials.get(OAuth2Properties.TOKEN), - () -> - AuthSession.fromAccessToken( - client, - tokenRefreshExecutor(name()), - credentials.get(OAuth2Properties.TOKEN), - expiresAtMillis(properties), - parent)); - } - - if (credentials.containsKey(OAuth2Properties.CREDENTIAL)) { - // fetch a token using the client credentials flow - return Pair.of( - credentials.get(OAuth2Properties.CREDENTIAL), - () -> - AuthSession.fromCredential( - client, - tokenRefreshExecutor(name()), - credentials.get(OAuth2Properties.CREDENTIAL), - parent)); - } - - for (String tokenType : TOKEN_PREFERENCE_ORDER) { - if (credentials.containsKey(tokenType)) { - // exchange the token for an access token using the token exchange flow - return Pair.of( - credentials.get(tokenType), - () -> - AuthSession.fromTokenExchange( - client, - tokenRefreshExecutor(name()), - credentials.get(tokenType), - tokenType, - parent)); - } - } - } - - return null; - } - - private Long expiresAtMillis(Map properties) { - if (properties.containsKey(OAuth2Properties.TOKEN_EXPIRES_IN_MS)) { - long expiresInMillis = - PropertyUtil.propertyAsLong( - properties, - OAuth2Properties.TOKEN_EXPIRES_IN_MS, - OAuth2Properties.TOKEN_EXPIRES_IN_MS_DEFAULT); - return System.currentTimeMillis() + expiresInMillis; - } else { - return null; - } - } - private void checkIdentifierIsValid(TableIdentifier tableIdentifier) { if (tableIdentifier.namespace().isEmpty()) { throw new NoSuchTableException("Invalid table identifier: %s", tableIdentifier); diff --git a/core/src/main/java/org/apache/iceberg/rest/auth/AuthManager.java b/core/src/main/java/org/apache/iceberg/rest/auth/AuthManager.java new file mode 100644 index 000000000000..ae0e55e2837f --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/auth/AuthManager.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.auth; + +import java.util.Map; +import java.util.function.Supplier; +import org.apache.iceberg.rest.RESTClient; +import org.apache.iceberg.util.Pair; + +public interface AuthManager extends AutoCloseable { + + Map mergeAuthHeadersForGetConfig( + RESTClient initialAuthClient, Map configuredHeaders); + + AuthSession newSession( + RESTClient authClient, Map mergedProps, Map baseHeaders); + + Pair> newSessionSupplier( + Map credentials, Map properties, AuthSession parent); + + void initialize(String managerName, Map properties); +} diff --git a/core/src/main/java/org/apache/iceberg/rest/auth/AuthManagers.java b/core/src/main/java/org/apache/iceberg/rest/auth/AuthManagers.java new file mode 100644 index 000000000000..fdb3253234da --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/auth/AuthManagers.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.auth; + +import java.util.Map; +import org.apache.iceberg.common.DynConstructors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AuthManagers { + + private static final Logger LOG = LoggerFactory.getLogger(AuthManagers.class); + + public static final String AUTH_MANAGER_IMPL = "auth-manager-impl"; + + private AuthManagers() {} + + public static AuthManager loadAuthManager(String name, Map properties) { + String impl = properties.get(AUTH_MANAGER_IMPL); + if (impl == null) { + return new OAuth2Manager(name, properties); + } + + LOG.info("Loading custom AuthManager implementation: {}", impl); + DynConstructors.Ctor ctor; + try { + ctor = + DynConstructors.builder(AuthManager.class) + .loader(AuthManagers.class.getClassLoader()) + .impl(impl) + .buildChecked(); + } catch (NoSuchMethodException e) { + throw new IllegalArgumentException( + String.format( + "Cannot initialize AuthManager implementation %s: %s", impl, e.getMessage()), + e); + } + + AuthManager authManager; + try { + authManager = ctor.newInstance(); + } catch (ClassCastException e) { + throw new IllegalArgumentException( + String.format("Cannot initialize AuthManager, %s does not implement AuthManager.", impl), + e); + } + + authManager.initialize(name, properties); + return authManager; + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/auth/AuthSession.java b/core/src/main/java/org/apache/iceberg/rest/auth/AuthSession.java new file mode 100644 index 000000000000..dcd9f8e35d68 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/auth/AuthSession.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.auth; + +import java.util.Map; + +public interface AuthSession { + + Map headers(); + + void stopRefreshing(); +} diff --git a/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Manager.java b/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Manager.java new file mode 100644 index 000000000000..962533e14ddb --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Manager.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.auth; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.rest.RESTClient; +import org.apache.iceberg.rest.RESTUtil; +import org.apache.iceberg.rest.ResourcePaths; +import org.apache.iceberg.rest.responses.OAuthTokenResponse; +import org.apache.iceberg.util.Pair; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.ThreadPools; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class OAuth2Manager implements AuthManager { + + private static final Logger LOG = LoggerFactory.getLogger(OAuth2Manager.class); + + private static final List TOKEN_PREFERENCE_ORDER = + ImmutableList.of( + OAuth2Properties.ID_TOKEN_TYPE, + OAuth2Properties.ACCESS_TOKEN_TYPE, + OAuth2Properties.JWT_TOKEN_TYPE, + OAuth2Properties.SAML2_TOKEN_TYPE, + OAuth2Properties.SAML1_TOKEN_TYPE); + + private String name; + private long startTimeMillis; + private Map authHeaders; + private String credential; + private String scope; + private Map optionalOAuthParams; + private String oauth2ServerUri; + private boolean keepTokenRefreshed = true; + private OAuthTokenResponse authResponse; + + // a lazy thread pool for token refresh + private volatile ScheduledExecutorService refreshExecutor = null; + + private RESTClient client; + + public OAuth2Manager() {} + + public OAuth2Manager(String name, Map properties) { + initialize(name, properties); + } + + @Override + public Map mergeAuthHeadersForGetConfig( + RESTClient initialAuthClient, Map configuredHeaders) { + Map initHeaders = RESTUtil.merge(configuredHeaders, authHeaders); + + if (credential != null && !credential.isEmpty()) { + this.authResponse = + OAuth2Util.fetchToken( + initialAuthClient, + initHeaders, + credential, + scope, + oauth2ServerUri, + optionalOAuthParams); + return RESTUtil.merge(initHeaders, OAuth2Util.authHeaders(authResponse.token())); + } else { + this.authResponse = null; + return initHeaders; + } + } + + @Override + public AuthSession newSession( + RESTClient authClient, Map mergedProps, Map baseHeaders) { + this.client = authClient; + String token = mergedProps.get(OAuth2Properties.TOKEN); + OAuth2Util.AuthSession catalogAuth = + new OAuth2Util.AuthSession( + baseHeaders, + AuthConfig.builder() + .credential(credential) + .scope(scope) + .oauth2ServerUri(oauth2ServerUri) + .optionalOAuthParams(optionalOAuthParams) + .build()); + if (authResponse != null) { + catalogAuth = + OAuth2Util.AuthSession.fromTokenResponse( + authClient, tokenRefreshExecutor(name), authResponse, startTimeMillis, catalogAuth); + } else if (token != null) { + catalogAuth = + OAuth2Util.AuthSession.fromAccessToken( + authClient, + tokenRefreshExecutor(name), + token, + expiresAtMillis(mergedProps), + catalogAuth); + } + return catalogAuth; + } + + @Override + public Pair> newSessionSupplier( + Map credentials, Map properties, AuthSession parent) { + OAuth2Util.AuthSession oauth2Parent = (OAuth2Util.AuthSession) parent; + if (credentials != null) { + // use the bearer token without exchanging + if (credentials.containsKey(OAuth2Properties.TOKEN)) { + return Pair.of( + credentials.get(OAuth2Properties.TOKEN), + () -> + OAuth2Util.AuthSession.fromAccessToken( + client, + tokenRefreshExecutor(name), + credentials.get(OAuth2Properties.TOKEN), + expiresAtMillis(properties), + oauth2Parent)); + } + + if (credentials.containsKey(OAuth2Properties.CREDENTIAL)) { + // fetch a token using the client credentials flow + return Pair.of( + credentials.get(OAuth2Properties.CREDENTIAL), + () -> + OAuth2Util.AuthSession.fromCredential( + client, + tokenRefreshExecutor(name), + credentials.get(OAuth2Properties.CREDENTIAL), + oauth2Parent)); + } + + for (String tokenType : TOKEN_PREFERENCE_ORDER) { + if (credentials.containsKey(tokenType)) { + // exchange the token for an access token using the token exchange flow + return Pair.of( + credentials.get(tokenType), + () -> + OAuth2Util.AuthSession.fromTokenExchange( + client, + tokenRefreshExecutor(name), + credentials.get(tokenType), + tokenType, + oauth2Parent)); + } + } + } + + return null; + } + + @Override + public void close() { + shutdownRefreshExecutor(); + } + + @Override + public void initialize(String managerName, Map properties) { + this.name = managerName; + this.startTimeMillis = + System.currentTimeMillis(); // keep track of the init start time for token refresh + this.authHeaders = OAuth2Util.authHeaders(properties.get(OAuth2Properties.TOKEN)); + this.credential = properties.get(OAuth2Properties.CREDENTIAL); + this.scope = properties.getOrDefault(OAuth2Properties.SCOPE, OAuth2Properties.CATALOG_SCOPE); + this.optionalOAuthParams = OAuth2Util.buildOptionalParam(properties); + this.oauth2ServerUri = + properties.getOrDefault(OAuth2Properties.OAUTH2_SERVER_URI, ResourcePaths.tokens()); + } + + private ScheduledExecutorService tokenRefreshExecutor(String catalogName) { + if (!keepTokenRefreshed) { + return null; + } + + if (refreshExecutor == null) { + synchronized (this) { + if (refreshExecutor == null) { + this.refreshExecutor = ThreadPools.newScheduledPool(catalogName + "-token-refresh", 1); + } + } + } + + return refreshExecutor; + } + + private Long expiresAtMillis(Map properties) { + if (properties.containsKey(OAuth2Properties.TOKEN_EXPIRES_IN_MS)) { + long expiresInMillis = + PropertyUtil.propertyAsLong( + properties, + OAuth2Properties.TOKEN_EXPIRES_IN_MS, + OAuth2Properties.TOKEN_EXPIRES_IN_MS_DEFAULT); + return System.currentTimeMillis() + expiresInMillis; + } else { + return null; + } + } + + private void shutdownRefreshExecutor() { + if (refreshExecutor != null) { + ScheduledExecutorService service = refreshExecutor; + this.refreshExecutor = null; + + List tasks = service.shutdownNow(); + tasks.forEach( + task -> { + if (task instanceof Future) { + ((Future) task).cancel(true); + } + }); + + try { + if (!service.awaitTermination(1, TimeUnit.MINUTES)) { + LOG.warn("Timed out waiting for refresh executor to terminate"); + } + } catch (InterruptedException e) { + LOG.warn("Interrupted while waiting for refresh executor to terminate", e); + Thread.currentThread().interrupt(); + } + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Util.java b/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Util.java index 76386027c629..d69dc552a661 100644 --- a/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Util.java +++ b/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Util.java @@ -453,7 +453,7 @@ static Long expiresAtMillis(String token) { } /** Class to handle authorization headers and token refresh. */ - public static class AuthSession { + public static class AuthSession implements org.apache.iceberg.rest.auth.AuthSession { private static int tokenRefreshNumRetries = 5; private static final long MAX_REFRESH_WINDOW_MILLIS = 300_000; // 5 minutes private static final long MIN_REFRESH_WAIT_MILLIS = 10; @@ -485,6 +485,7 @@ public AuthSession( .build()); } + @Override public Map headers() { return headers; } @@ -505,6 +506,7 @@ public String scope() { return config.scope(); } + @Override public synchronized void stopRefreshing() { this.config = ImmutableAuthConfig.copyOf(config).withKeepRefreshed(false); }