Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions aws/src/main/java/org/apache/iceberg/aws/RESTSigV4Signer.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@
* <p>See <a
* href="https://docs.aws.amazon.com/general/latest/gr/signing-aws-api-requests.html">Signing AWS
* API requests</a> for details about the protocol.
*
* @deprecated since 1.9.0, will be removed in 1.10.0; use {@link RESTSigV4AuthManager} instead.
*/
@Deprecated
public class RESTSigV4Signer implements HttpRequestInterceptor {
static final String EMPTY_BODY_SHA256 =
"e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,10 @@
import org.apache.iceberg.relocated.com.google.common.base.Strings;
import org.apache.iceberg.rest.ErrorHandlers;
import org.apache.iceberg.rest.HTTPClient;
import org.apache.iceberg.rest.HTTPHeaders;
import org.apache.iceberg.rest.RESTClient;
import org.apache.iceberg.rest.auth.DefaultAuthSession;
import org.apache.iceberg.rest.auth.OAuth2Properties;
import org.apache.iceberg.rest.auth.OAuth2Util;
import org.apache.iceberg.rest.auth.AuthManager;
import org.apache.iceberg.rest.auth.AuthManagers;
import org.apache.iceberg.rest.auth.AuthSession;
import org.apache.iceberg.rest.credentials.Credential;
import org.apache.iceberg.rest.responses.LoadCredentialsResponse;
import software.amazon.awssdk.auth.credentials.AwsCredentials;
Expand All @@ -48,6 +47,8 @@ public class VendedCredentialsProvider implements AwsCredentialsProvider, SdkAut
private volatile HTTPClient client;
private final Map<String, String> properties;
private final CachedSupplier<AwsCredentials> credentialCache;
private AuthManager authManager;
private AuthSession authSession;

private VendedCredentialsProvider(Map<String, String> properties) {
Preconditions.checkArgument(null != properties, "Invalid properties: null");
Expand All @@ -66,8 +67,10 @@ public AwsCredentials resolveCredentials() {

@Override
public void close() {
IoUtils.closeQuietly(client, null);
credentialCache.close();
IoUtils.closeQuietlyV2(authSession, null);
IoUtils.closeQuietlyV2(authManager, null);
IoUtils.closeQuietlyV2(client, null);
IoUtils.closeQuietlyV2(credentialCache, null);
}

public static VendedCredentialsProvider create(Map<String, String> properties) {
Expand All @@ -78,14 +81,10 @@ private RESTClient httpClient() {
if (null == client) {
synchronized (this) {
if (null == client) {
DefaultAuthSession authSession =
DefaultAuthSession.of(
HTTPHeaders.of(OAuth2Util.authHeaders(properties.get(OAuth2Properties.TOKEN))));
client =
HTTPClient.builder(properties)
.uri(properties.get(URI))
.withAuthSession(authSession)
.build();
authManager = AuthManagers.loadAuthManager("s3-credentials-refresh", properties);
HTTPClient httpClient = HTTPClient.builder(properties).uri(properties.get(URI)).build();
authSession = authManager.catalogSession(httpClient, properties);
client = httpClient.withAuthSession(authSession);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,12 @@

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalListener;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;
Expand All @@ -42,13 +39,12 @@
import org.apache.iceberg.rest.HTTPClient;
import org.apache.iceberg.rest.RESTClient;
import org.apache.iceberg.rest.ResourcePaths;
import org.apache.iceberg.rest.auth.AuthConfig;
import org.apache.iceberg.rest.auth.AuthManager;
import org.apache.iceberg.rest.auth.AuthManagers;
import org.apache.iceberg.rest.auth.AuthSession;
import org.apache.iceberg.rest.auth.OAuth2Properties;
import org.apache.iceberg.rest.auth.OAuth2Util;
import org.apache.iceberg.rest.auth.OAuth2Util.AuthSession;
import org.apache.iceberg.rest.responses.OAuthTokenResponse;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.ThreadPools;
import org.immutables.value.Value;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -64,7 +60,7 @@

@Value.Immutable
public abstract class S3V4RestSignerClient
extends AbstractAws4Signer<AwsS3V4SignerParams, Aws4PresignerParams> {
extends AbstractAws4Signer<AwsS3V4SignerParams, Aws4PresignerParams> implements AutoCloseable {

private static final Logger LOG = LoggerFactory.getLogger(S3V4RestSignerClient.class);
public static final String S3_SIGNER_URI = "s3.signer.uri";
Expand All @@ -81,13 +77,14 @@ public abstract class S3V4RestSignerClient
private static final String SCOPE = "sign";

@SuppressWarnings("immutables:incompat")
private static volatile ScheduledExecutorService tokenRefreshExecutor;
private volatile AuthManager authManager;
Copy link
Contributor

@nastra nastra Mar 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

related to my other comment from https://github.com/apache/iceberg/pull/12197/files#r1993040250 I currently don't know whether having this field being non-static would cause what was earlier tested in https://github.com/apache/iceberg/pull/12197/files#r1993040250. My current guess is that the code you currently have here will work, but we'll know for sure once we have the tests in place that I mentioned in the comment


@SuppressWarnings("immutables:incompat")
private static volatile RESTClient httpClient;
@SuppressWarnings({"immutables:incompat", "VisibilityModifier"})
@VisibleForTesting
static volatile RESTClient httpClient;

@SuppressWarnings("immutables:incompat")
private static volatile Cache<String, AuthSession> authSessionCache;
private volatile AuthSession authSession;

public abstract Map<String, String> properties();

Expand Down Expand Up @@ -138,52 +135,6 @@ boolean keepTokenRefreshed() {
OAuth2Properties.TOKEN_REFRESH_ENABLED_DEFAULT);
}

@VisibleForTesting
ScheduledExecutorService tokenRefreshExecutor() {
if (!keepTokenRefreshed()) {
return null;
}

if (null == tokenRefreshExecutor) {
synchronized (S3V4RestSignerClient.class) {
if (null == tokenRefreshExecutor) {
tokenRefreshExecutor = ThreadPools.newScheduledPool("s3-signer-token-refresh", 1);
}
}
}

return tokenRefreshExecutor;
}

private Cache<String, AuthSession> authSessionCache() {
if (null == authSessionCache) {
synchronized (S3V4RestSignerClient.class) {
if (null == authSessionCache) {
long expirationIntervalMs =
PropertyUtil.propertyAsLong(
properties(),
CatalogProperties.AUTH_SESSION_TIMEOUT_MS,
CatalogProperties.AUTH_SESSION_TIMEOUT_MS_DEFAULT);

authSessionCache =
Caffeine.newBuilder()
.expireAfterAccess(Duration.ofMillis(expirationIntervalMs))
.removalListener(
(RemovalListener<String, AuthSession>)
(id, auth, cause) -> {
if (null != auth) {
LOG.trace("Stopping refresh for AuthSession");
auth.stopRefreshing();
}
})
.build();
}
}
}

return authSessionCache;
}

private RESTClient httpClient() {
if (null == httpClient) {
synchronized (S3V4RestSignerClient.class) {
Expand All @@ -200,86 +151,40 @@ private RESTClient httpClient() {
return httpClient;
}

private AuthSession authSession() {
String token = token().get();
if (null != token) {
return authSessionCache()
.get(
token,
id -> {
// this client will be reused for token refreshes; it must contain an empty auth
// session in order to avoid interfering with refreshed tokens
RESTClient refreshClient =
httpClient().withAuthSession(org.apache.iceberg.rest.auth.AuthSession.EMPTY);
return AuthSession.fromAccessToken(
refreshClient,
tokenRefreshExecutor(),
token,
expiresAtMillis(properties()),
new AuthSession(
ImmutableMap.of(),
AuthConfig.builder()
.token(token)
.credential(credential())
.scope(SCOPE)
.oauth2ServerUri(oauth2ServerUri())
.optionalOAuthParams(optionalOAuthParams())
.build()));
});
}

if (credentialProvided()) {
return authSessionCache()
.get(
credential(),
id -> {
AuthSession session =
new AuthSession(
ImmutableMap.of(),
AuthConfig.builder()
.credential(credential())
.scope(SCOPE)
.oauth2ServerUri(oauth2ServerUri())
.optionalOAuthParams(optionalOAuthParams())
.build());
long startTimeMillis = System.currentTimeMillis();
// this client will be reused for token refreshes; it must contain an empty auth
// session in order to avoid interfering with refreshed tokens
RESTClient refreshClient =
httpClient().withAuthSession(org.apache.iceberg.rest.auth.AuthSession.EMPTY);
OAuthTokenResponse authResponse =
OAuth2Util.fetchToken(
refreshClient,
session.headers(),
credential(),
SCOPE,
oauth2ServerUri(),
optionalOAuthParams());
return AuthSession.fromTokenResponse(
refreshClient, tokenRefreshExecutor(), authResponse, startTimeMillis, session);
});
@VisibleForTesting
AuthSession authSession() {
if (null == authSession) {
synchronized (S3V4RestSignerClient.class) {
if (null == authSession) {
authManager = AuthManagers.loadAuthManager("s3-signer", properties());
ImmutableMap.Builder<String, String> properties =
ImmutableMap.<String, String>builder()
.putAll(properties())
.putAll(optionalOAuthParams())
.put(OAuth2Properties.OAUTH2_SERVER_URI, oauth2ServerUri())
.put(OAuth2Properties.TOKEN_REFRESH_ENABLED, String.valueOf(keepTokenRefreshed()))
.put(OAuth2Properties.SCOPE, SCOPE);
String token = token().get();
if (null != token) {
properties.put(OAuth2Properties.TOKEN, token);
}

if (credentialProvided()) {
properties.put(OAuth2Properties.CREDENTIAL, credential());
}

authSession = authManager.tableSession(httpClient(), properties.buildKeepingLast());
}
}
}

return AuthSession.empty();
return authSession;
}

private boolean credentialProvided() {
return null != credential() && !credential().isEmpty();
}

private Long expiresAtMillis(Map<String, String> properties) {
if (properties.containsKey(OAuth2Properties.TOKEN_EXPIRES_IN_MS)) {
long expiresInMillis =
PropertyUtil.propertyAsLong(
properties,
OAuth2Properties.TOKEN_EXPIRES_IN_MS,
OAuth2Properties.TOKEN_EXPIRES_IN_MS_DEFAULT);
return System.currentTimeMillis() + expiresInMillis;
} else {
return null;
}
}

@Value.Check
protected void check() {
Preconditions.checkArgument(
Expand Down Expand Up @@ -377,6 +282,12 @@ public SdkHttpFullRequest sign(
return mutableRequest.build();
}

@Override
public void close() throws Exception {
IoUtils.closeQuietlyV2(authSession, null);
IoUtils.closeQuietlyV2(authManager, null);
}

/**
* Only add body for DeleteObjectsRequest. Refer to
* https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html#API_DeleteObjects_RequestSyntax
Expand Down
Loading
Loading