Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 15 additions & 174 deletions core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
Expand Down Expand Up @@ -70,10 +67,9 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.rest.auth.AuthConfig;
import org.apache.iceberg.rest.auth.OAuth2Properties;
import org.apache.iceberg.rest.auth.OAuth2Util;
import org.apache.iceberg.rest.auth.OAuth2Util.AuthSession;
import org.apache.iceberg.rest.auth.AuthManager;
import org.apache.iceberg.rest.auth.AuthManagers;
import org.apache.iceberg.rest.auth.AuthSession;
import org.apache.iceberg.rest.requests.CommitTransactionRequest;
import org.apache.iceberg.rest.requests.CreateNamespaceRequest;
import org.apache.iceberg.rest.requests.CreateTableRequest;
Expand All @@ -91,12 +87,10 @@
import org.apache.iceberg.rest.responses.ListTablesResponse;
import org.apache.iceberg.rest.responses.LoadTableResponse;
import org.apache.iceberg.rest.responses.LoadViewResponse;
import org.apache.iceberg.rest.responses.OAuthTokenResponse;
import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse;
import org.apache.iceberg.util.EnvironmentUtil;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.ThreadPools;
import org.apache.iceberg.view.BaseView;
import org.apache.iceberg.view.ImmutableSQLViewRepresentation;
import org.apache.iceberg.view.ImmutableViewVersion;
Expand All @@ -116,21 +110,13 @@ public class RESTSessionCatalog extends BaseViewSessionCatalog
private static final String REST_METRICS_REPORTING_ENABLED = "rest-metrics-reporting-enabled";
private static final String REST_SNAPSHOT_LOADING_MODE = "snapshot-loading-mode";
public static final String REST_PAGE_SIZE = "rest-page-size";
private static final List<String> TOKEN_PREFERENCE_ORDER =
ImmutableList.of(
OAuth2Properties.ID_TOKEN_TYPE,
OAuth2Properties.ACCESS_TOKEN_TYPE,
OAuth2Properties.JWT_TOKEN_TYPE,
OAuth2Properties.SAML2_TOKEN_TYPE,
OAuth2Properties.SAML1_TOKEN_TYPE);

private final Function<Map<String, String>, RESTClient> clientBuilder;
private final BiFunction<SessionContext, Map<String, String>, FileIO> ioBuilder;
private Cache<String, AuthSession> sessions = null;
private Cache<String, AuthSession> tableSessions = null;
private Cache<TableOperations, FileIO> fileIOCloser;
private AuthSession catalogAuth = null;
private boolean keepTokenRefreshed = true;
private AuthManager authManager;
private RESTClient client = null;
private ResourcePaths paths = null;
private SnapshotMode snapshotMode = null;
Expand All @@ -141,9 +127,6 @@ public class RESTSessionCatalog extends BaseViewSessionCatalog
private Integer pageSize = null;
private CloseableGroup closeables = null;

// a lazy thread pool for token refresh
private volatile ScheduledExecutorService refreshExecutor = null;

enum SnapshotMode {
ALL,
REFS;
Expand Down Expand Up @@ -172,33 +155,14 @@ public void initialize(String name, Map<String, String> unresolved) {
// note that this is only done for local config properties and not for properties from the
// catalog service
Map<String, String> props = EnvironmentUtil.resolveAll(unresolved);
Map<String, String> configuredHeaders = configHeaders(props);
this.authManager = AuthManagers.loadAuthManager(name, props);

long startTimeMillis =
System.currentTimeMillis(); // keep track of the init start time for token refresh
String initToken = props.get(OAuth2Properties.TOKEN);

// fetch auth and config to complete initialization
ConfigResponse config;
OAuthTokenResponse authResponse;
String credential = props.get(OAuth2Properties.CREDENTIAL);
String scope = props.getOrDefault(OAuth2Properties.SCOPE, OAuth2Properties.CATALOG_SCOPE);
Map<String, String> optionalOAuthParams = OAuth2Util.buildOptionalParam(props);
String oauth2ServerUri =
props.getOrDefault(OAuth2Properties.OAUTH2_SERVER_URI, ResourcePaths.tokens());
try (RESTClient initClient = clientBuilder.apply(props)) {
Map<String, String> initHeaders =
RESTUtil.merge(configHeaders(props), OAuth2Util.authHeaders(initToken));
if (credential != null && !credential.isEmpty()) {
authResponse =
OAuth2Util.fetchToken(
initClient, initHeaders, credential, scope, oauth2ServerUri, optionalOAuthParams);
Map<String, String> authHeaders =
RESTUtil.merge(initHeaders, OAuth2Util.authHeaders(authResponse.token()));
config = fetchConfig(initClient, authHeaders, props);
} else {
authResponse = null;
config = fetchConfig(initClient, initHeaders, props);
}
Map<String, String> authAndConfiguredHeaders =
authManager.mergeAuthHeadersForGetConfig(initClient, configuredHeaders);
config = fetchConfig(initClient, authAndConfiguredHeaders, props);
} catch (IOException e) {
throw new UncheckedIOException("Failed to close HTTP client", e);
}
Expand All @@ -209,33 +173,9 @@ public void initialize(String name, Map<String, String> unresolved) {

this.sessions = newSessionCache(mergedProps);
this.tableSessions = newSessionCache(mergedProps);
this.keepTokenRefreshed =
PropertyUtil.propertyAsBoolean(
mergedProps,
OAuth2Properties.TOKEN_REFRESH_ENABLED,
OAuth2Properties.TOKEN_REFRESH_ENABLED_DEFAULT);
this.client = clientBuilder.apply(mergedProps);
this.paths = ResourcePaths.forCatalogProperties(mergedProps);

String token = mergedProps.get(OAuth2Properties.TOKEN);
this.catalogAuth =
new AuthSession(
baseHeaders,
AuthConfig.builder()
.credential(credential)
.scope(scope)
.oauth2ServerUri(oauth2ServerUri)
.optionalOAuthParams(optionalOAuthParams)
.build());
if (authResponse != null) {
this.catalogAuth =
AuthSession.fromTokenResponse(
client, tokenRefreshExecutor(name), authResponse, startTimeMillis, catalogAuth);
} else if (token != null) {
this.catalogAuth =
AuthSession.fromAccessToken(
client, tokenRefreshExecutor(name), token, expiresAtMillis(mergedProps), catalogAuth);
}
this.catalogAuth = authManager.newSession(client, mergedProps, baseHeaders);

this.pageSize = PropertyUtil.propertyAsNullableInt(mergedProps, REST_PAGE_SIZE);
if (pageSize != null) {
Expand All @@ -249,6 +189,7 @@ public void initialize(String name, Map<String, String> unresolved) {
this.closeables = new CloseableGroup();
this.closeables.addCloseable(this.io);
this.closeables.addCloseable(this.client);
this.closeables.addCloseable(this.authManager);
this.closeables.setSuppressCloseFailure(true);

this.snapshotMode =
Expand All @@ -270,7 +211,8 @@ private AuthSession session(SessionContext context) {
context.sessionId(),
id -> {
Pair<String, Supplier<AuthSession>> newSession =
newSession(context.credentials(), context.properties(), catalogAuth);
authManager.newSessionSupplier(
context.credentials(), context.properties(), catalogAuth);
if (null != newSession) {
return newSession.second().get();
}
Expand Down Expand Up @@ -595,26 +537,8 @@ public boolean updateNamespaceMetadata(
return !response.updated().isEmpty();
}

private ScheduledExecutorService tokenRefreshExecutor(String catalogName) {
if (!keepTokenRefreshed) {
return null;
}

if (refreshExecutor == null) {
synchronized (this) {
if (refreshExecutor == null) {
this.refreshExecutor = ThreadPools.newScheduledPool(catalogName + "-token-refresh", 1);
}
}
}

return refreshExecutor;
}

@Override
public void close() throws IOException {
shutdownRefreshExecutor();

if (closeables != null) {
closeables.close();
}
Expand All @@ -625,30 +549,6 @@ public void close() throws IOException {
}
}

private void shutdownRefreshExecutor() {
if (refreshExecutor != null) {
ScheduledExecutorService service = refreshExecutor;
this.refreshExecutor = null;

List<Runnable> tasks = service.shutdownNow();
tasks.forEach(
task -> {
if (task instanceof Future) {
((Future<?>) task).cancel(true);
}
});

try {
if (!service.awaitTermination(1, TimeUnit.MINUTES)) {
LOG.warn("Timed out waiting for refresh executor to terminate");
}
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting for refresh executor to terminate", e);
Thread.currentThread().interrupt();
}
}
}

private class Builder implements Catalog.TableBuilder {
private final TableIdentifier ident;
private final Schema schema;
Expand Down Expand Up @@ -922,7 +822,8 @@ private FileIO tableFileIO(SessionContext context, Map<String, String> config) {
}

private AuthSession tableSession(Map<String, String> tableConf, AuthSession parent) {
Pair<String, Supplier<AuthSession>> newSession = newSession(tableConf, tableConf, parent);
Pair<String, Supplier<AuthSession>> newSession =
authManager.newSessionSupplier(tableConf, tableConf, parent);
if (null == newSession) {
return parent;
}
Expand Down Expand Up @@ -957,66 +858,6 @@ private static ConfigResponse fetchConfig(
return configResponse;
}

private Pair<String, Supplier<AuthSession>> newSession(
Map<String, String> credentials, Map<String, String> properties, AuthSession parent) {
if (credentials != null) {
// use the bearer token without exchanging
if (credentials.containsKey(OAuth2Properties.TOKEN)) {
return Pair.of(
credentials.get(OAuth2Properties.TOKEN),
() ->
AuthSession.fromAccessToken(
client,
tokenRefreshExecutor(name()),
credentials.get(OAuth2Properties.TOKEN),
expiresAtMillis(properties),
parent));
}

if (credentials.containsKey(OAuth2Properties.CREDENTIAL)) {
// fetch a token using the client credentials flow
return Pair.of(
credentials.get(OAuth2Properties.CREDENTIAL),
() ->
AuthSession.fromCredential(
client,
tokenRefreshExecutor(name()),
credentials.get(OAuth2Properties.CREDENTIAL),
parent));
}

for (String tokenType : TOKEN_PREFERENCE_ORDER) {
if (credentials.containsKey(tokenType)) {
// exchange the token for an access token using the token exchange flow
return Pair.of(
credentials.get(tokenType),
() ->
AuthSession.fromTokenExchange(
client,
tokenRefreshExecutor(name()),
credentials.get(tokenType),
tokenType,
parent));
}
}
}

return null;
}

private Long expiresAtMillis(Map<String, String> properties) {
if (properties.containsKey(OAuth2Properties.TOKEN_EXPIRES_IN_MS)) {
long expiresInMillis =
PropertyUtil.propertyAsLong(
properties,
OAuth2Properties.TOKEN_EXPIRES_IN_MS,
OAuth2Properties.TOKEN_EXPIRES_IN_MS_DEFAULT);
return System.currentTimeMillis() + expiresInMillis;
} else {
return null;
}
}

private void checkIdentifierIsValid(TableIdentifier tableIdentifier) {
if (tableIdentifier.namespace().isEmpty()) {
throw new NoSuchTableException("Invalid table identifier: %s", tableIdentifier);
Expand Down
38 changes: 38 additions & 0 deletions core/src/main/java/org/apache/iceberg/rest/auth/AuthManager.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.rest.auth;

import java.util.Map;
import java.util.function.Supplier;
import org.apache.iceberg.rest.RESTClient;
import org.apache.iceberg.util.Pair;

public interface AuthManager extends AutoCloseable {

Map<String, String> mergeAuthHeadersForGetConfig(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be improved. Here we are constructing an initial auth session so a better signature would be:

  AuthSession initialAuth(RESTClient initialAuthClient, Map<String, String> initialHeaders);

It's very easy to adapt the only call site of that method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adutra Isn't that the purpose of the newSession API?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah @jackye1995 I'm not sure there really needs to be a separate mergeAuthHeadersForGetConfig API. Feels like the newSession API should do all that in the implementation?

Copy link
Contributor

@adutra adutra Jul 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd say it's actually the opposite: we need more methods.

In my own exploration I came up with the following API which works well:

https://github.com/adutra/iceberg/blob/auth-manager-adutra/core/src/main/java/org/apache/iceberg/rest/auth/AuthManager.java

We need a different factory method for each "level" (or "scope") where an auth session could be introduced (and cached).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However I agree that the first method newInitialSession is really only meant to produce a short-lived, temporary auth session in order to call the config endpoint. It's discarded immediately after. But I think we need it and we can't use newCatalogSession in this case.

RESTClient initialAuthClient, Map<String, String> configuredHeaders);

AuthSession newSession(
RESTClient authClient, Map<String, String> mergedProps, Map<String, String> baseHeaders);

Pair<String, Supplier<AuthSession>> newSessionSupplier(
Map<String, String> credentials, Map<String, String> properties, AuthSession parent);

void initialize(String managerName, Map<String, String> properties);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the point of having a managerName?

}
Loading