diff --git a/aws/src/main/java/org/apache/iceberg/aws/RESTSigV4AuthManager.java b/aws/src/main/java/org/apache/iceberg/aws/RESTSigV4AuthManager.java
new file mode 100644
index 000000000000..061a6afe549d
--- /dev/null
+++ b/aws/src/main/java/org/apache/iceberg/aws/RESTSigV4AuthManager.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws;
+
+import java.util.Map;
+import org.apache.iceberg.catalog.SessionCatalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.rest.RESTClient;
+import org.apache.iceberg.rest.RESTUtil;
+import org.apache.iceberg.rest.auth.AuthManager;
+import org.apache.iceberg.rest.auth.AuthSession;
+import software.amazon.awssdk.auth.signer.Aws4Signer;
+
+/**
+ * An AuthManager that authenticates requests with SigV4.
+ *
+ *
It takes a delegate AuthManager to handle double authentication cases, e.g. on top of OAuth2.
+ */
+@SuppressWarnings("unused") // loaded by reflection
+public class RESTSigV4AuthManager implements AuthManager {
+
+ private final Aws4Signer signer = Aws4Signer.create();
+ private final AuthManager delegate;
+
+ private Map catalogProperties = Map.of();
+
+ public RESTSigV4AuthManager(String name, AuthManager delegate) {
+ this.delegate = Preconditions.checkNotNull(delegate, "Invalid delegate: null");
+ }
+
+ @Override
+ public RESTSigV4AuthSession initSession(RESTClient initClient, Map properties) {
+ return new RESTSigV4AuthSession(
+ signer, delegate.initSession(initClient, properties), new AwsProperties(properties));
+ }
+
+ @Override
+ public RESTSigV4AuthSession catalogSession(
+ RESTClient sharedClient, Map properties) {
+ this.catalogProperties = properties;
+ AwsProperties awsProperties = new AwsProperties(catalogProperties);
+ return new RESTSigV4AuthSession(
+ signer, delegate.catalogSession(sharedClient, catalogProperties), awsProperties);
+ }
+
+ @Override
+ public AuthSession contextualSession(SessionCatalog.SessionContext context, AuthSession parent) {
+ AwsProperties contextProperties =
+ new AwsProperties(RESTUtil.merge(catalogProperties, context.properties()));
+ return new RESTSigV4AuthSession(
+ signer, delegate.contextualSession(context, parent), contextProperties);
+ }
+
+ @Override
+ public AuthSession tableSession(
+ TableIdentifier table, Map properties, AuthSession parent) {
+ AwsProperties tableProperties =
+ new AwsProperties(RESTUtil.merge(catalogProperties, properties));
+ return new RESTSigV4AuthSession(
+ signer, delegate.tableSession(table, properties, parent), tableProperties);
+ }
+
+ @Override
+ public void close() {
+ delegate.close();
+ }
+}
diff --git a/aws/src/main/java/org/apache/iceberg/aws/RESTSigV4AuthSession.java b/aws/src/main/java/org/apache/iceberg/aws/RESTSigV4AuthSession.java
new file mode 100644
index 000000000000..f2e66cfb755d
--- /dev/null
+++ b/aws/src/main/java/org/apache/iceberg/aws/RESTSigV4AuthSession.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws;
+
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.commons.io.IOUtils;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.rest.HTTPHeaders;
+import org.apache.iceberg.rest.HTTPHeaders.HTTPHeader;
+import org.apache.iceberg.rest.HTTPRequest;
+import org.apache.iceberg.rest.ImmutableHTTPHeaders;
+import org.apache.iceberg.rest.ImmutableHTTPRequest;
+import org.apache.iceberg.rest.auth.AuthSession;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.signer.Aws4Signer;
+import software.amazon.awssdk.auth.signer.internal.SignerConstant;
+import software.amazon.awssdk.auth.signer.params.Aws4SignerParams;
+import software.amazon.awssdk.auth.signer.params.SignerChecksumParams;
+import software.amazon.awssdk.core.checksums.Algorithm;
+import software.amazon.awssdk.http.SdkHttpFullRequest;
+import software.amazon.awssdk.http.SdkHttpMethod;
+import software.amazon.awssdk.regions.Region;
+
+/**
+ * An AuthSession that signs requests with SigV4.
+ *
+ * The request is first authenticated by the delegate AuthSession, then signed with SigV4. In
+ * case of conflicting headers, the Authorization header set by delegate AuthSession will be
+ * relocated, then included in the canonical headers to sign.
+ *
+ *
See Signing AWS
+ * API requests for details about the SigV4 protocol.
+ */
+public class RESTSigV4AuthSession implements AuthSession {
+
+ static final String EMPTY_BODY_SHA256 =
+ "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855";
+ static final String RELOCATED_HEADER_PREFIX = "Original-";
+
+ private final Aws4Signer signer;
+ private final AuthSession delegate;
+ private final Region signingRegion;
+ private final String signingName;
+ private final AwsCredentialsProvider credentialsProvider;
+
+ public RESTSigV4AuthSession(
+ Aws4Signer aws4Signer, AuthSession delegateAuthSession, AwsProperties awsProperties) {
+ this.signer = Preconditions.checkNotNull(aws4Signer, "Invalid signer: null");
+ this.delegate = Preconditions.checkNotNull(delegateAuthSession, "Invalid delegate: null");
+ Preconditions.checkNotNull(awsProperties, "Invalid AWS properties: null");
+ this.signingRegion = awsProperties.restSigningRegion();
+ this.signingName = awsProperties.restSigningName();
+ this.credentialsProvider = awsProperties.restCredentialsProvider();
+ }
+
+ @Override
+ public HTTPRequest authenticate(HTTPRequest request) {
+ return sign(delegate.authenticate(request));
+ }
+
+ @Override
+ public void close() {
+ delegate.close();
+ }
+
+ private HTTPRequest sign(HTTPRequest request) {
+ Aws4SignerParams params =
+ Aws4SignerParams.builder()
+ .signingName(signingName)
+ .signingRegion(signingRegion)
+ .awsCredentials(credentialsProvider.resolveCredentials())
+ .checksumParams(
+ SignerChecksumParams.builder()
+ .algorithm(Algorithm.SHA256)
+ .isStreamingRequest(false)
+ .checksumHeaderName(SignerConstant.X_AMZ_CONTENT_SHA256)
+ .build())
+ .build();
+
+ SdkHttpFullRequest.Builder sdkRequestBuilder = SdkHttpFullRequest.builder();
+
+ URI uri = request.requestUri();
+ sdkRequestBuilder
+ .method(SdkHttpMethod.fromValue(request.method().name()))
+ .protocol(uri.getScheme())
+ .uri(uri)
+ .headers(convertHeaders(request.headers()));
+
+ String body = request.encodedBody();
+ if (body == null) {
+ // This is a workaround for the signer implementation incorrectly producing
+ // an invalid content checksum for empty body requests.
+ sdkRequestBuilder.putHeader(SignerConstant.X_AMZ_CONTENT_SHA256, EMPTY_BODY_SHA256);
+ } else {
+ sdkRequestBuilder.contentStreamProvider(
+ () -> IOUtils.toInputStream(body, StandardCharsets.UTF_8));
+ }
+
+ SdkHttpFullRequest signedSdkRequest = signer.sign(sdkRequestBuilder.build(), params);
+ HTTPHeaders newHeaders = updateRequestHeaders(request.headers(), signedSdkRequest.headers());
+ return ImmutableHTTPRequest.builder().from(request).headers(newHeaders).build();
+ }
+
+ private Map> convertHeaders(HTTPHeaders headers) {
+ return headers.entries().stream()
+ .collect(
+ Collectors.groupingBy(
+ // Relocate Authorization header as SigV4 takes precedence
+ header ->
+ header.name().equalsIgnoreCase("Authorization")
+ ? RELOCATED_HEADER_PREFIX + header.name()
+ : header.name(),
+ Collectors.mapping(HTTPHeader::value, Collectors.toList())));
+ }
+
+ private HTTPHeaders updateRequestHeaders(
+ HTTPHeaders originalHeaders, Map> signedHeaders) {
+ ImmutableHTTPHeaders.Builder newHeaders = ImmutableHTTPHeaders.builder();
+ signedHeaders.forEach(
+ (name, signedValues) -> {
+ if (originalHeaders.contains(name)) {
+ for (HTTPHeader originalHeader : originalHeaders.entries(name)) {
+ // Relocate headers if there is a conflict with signed headers
+ if (!signedValues.contains(originalHeader.value())) {
+ newHeaders.addEntry(
+ HTTPHeader.of(RELOCATED_HEADER_PREFIX + name, originalHeader.value()));
+ }
+ }
+ }
+
+ signedValues.forEach(value -> newHeaders.addEntry(HTTPHeader.of(name, value)));
+ });
+
+ return newHeaders.build();
+ }
+}
diff --git a/aws/src/test/java/org/apache/iceberg/aws/TestRESTSigV4AuthManager.java b/aws/src/test/java/org/apache/iceberg/aws/TestRESTSigV4AuthManager.java
new file mode 100644
index 000000000000..68df9e2d1c7f
--- /dev/null
+++ b/aws/src/test/java/org/apache/iceberg/aws/TestRESTSigV4AuthManager.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
+
+import java.util.Map;
+import org.apache.iceberg.catalog.SessionCatalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.rest.HTTPClient;
+import org.apache.iceberg.rest.RESTClient;
+import org.apache.iceberg.rest.auth.AuthManager;
+import org.apache.iceberg.rest.auth.AuthManagers;
+import org.apache.iceberg.rest.auth.AuthProperties;
+import org.apache.iceberg.rest.auth.AuthSession;
+import org.apache.iceberg.rest.auth.NoopAuthManager;
+import org.apache.iceberg.rest.auth.OAuth2Manager;
+import org.apache.iceberg.rest.auth.OAuth2Util;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+class TestRESTSigV4AuthManager {
+
+ private final Map awsProperties =
+ Map.of(
+ // CI environment doesn't have credentials, but a value must be set for signing
+ AwsProperties.REST_SIGNER_REGION,
+ "us-west-2",
+ AwsProperties.REST_ACCESS_KEY_ID,
+ "id",
+ AwsProperties.REST_SECRET_ACCESS_KEY,
+ "secret");
+
+ @Test
+ void create() {
+ AuthManager manager =
+ AuthManagers.loadAuthManager(
+ "test", Map.of(AuthProperties.AUTH_TYPE, AuthProperties.AUTH_TYPE_SIGV4));
+ assertThat(manager)
+ .isInstanceOf(RESTSigV4AuthManager.class)
+ .extracting("delegate")
+ .isInstanceOf(OAuth2Manager.class);
+ }
+
+ @Test
+ void createLegacy() {
+ AuthManager manager =
+ AuthManagers.loadAuthManager("test", Map.of("rest.sigv4-enabled", "true"));
+ assertThat(manager)
+ .isInstanceOf(RESTSigV4AuthManager.class)
+ .extracting("delegate")
+ .isInstanceOf(OAuth2Manager.class);
+ }
+
+ @Test
+ void createCustomDelegate() {
+ AuthManager manager =
+ AuthManagers.loadAuthManager(
+ "test",
+ Map.of(
+ AuthProperties.AUTH_TYPE,
+ AuthProperties.AUTH_TYPE_SIGV4,
+ AuthProperties.SIGV4_DELEGATE_AUTH_TYPE,
+ AuthProperties.AUTH_TYPE_NONE));
+ assertThat(manager)
+ .isInstanceOf(RESTSigV4AuthManager.class)
+ .extracting("delegate")
+ .isInstanceOf(NoopAuthManager.class);
+ }
+
+ @Test
+ @SuppressWarnings("resource")
+ void createInvalidCustomDelegate() {
+ assertThatThrownBy(
+ () ->
+ AuthManagers.loadAuthManager(
+ "test",
+ Map.of(
+ AuthProperties.AUTH_TYPE,
+ AuthProperties.AUTH_TYPE_SIGV4,
+ AuthProperties.SIGV4_DELEGATE_AUTH_TYPE,
+ AuthProperties.AUTH_TYPE_SIGV4)))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Cannot delegate a SigV4 auth manager to another SigV4 auth manager");
+ }
+
+ @Test
+ void initSession() {
+ AuthManager delegate = Mockito.mock(AuthManager.class);
+ when(delegate.initSession(any(), any())).thenReturn(Mockito.mock(OAuth2Util.AuthSession.class));
+ RESTClient client = Mockito.mock(RESTClient.class);
+ AuthManager manager = new RESTSigV4AuthManager("test", delegate);
+ AuthSession authSession = manager.initSession(client, awsProperties);
+ assertThat(authSession)
+ .isInstanceOf(RESTSigV4AuthSession.class)
+ .extracting("delegate")
+ .isInstanceOf(OAuth2Util.AuthSession.class);
+ }
+
+ @Test
+ void catalogSession() {
+ AuthManager delegate = Mockito.mock(AuthManager.class);
+ when(delegate.catalogSession(any(), any()))
+ .thenReturn(Mockito.mock(OAuth2Util.AuthSession.class));
+ RESTClient client = Mockito.mock(RESTClient.class);
+ AuthManager manager = new RESTSigV4AuthManager("test", delegate);
+ AuthSession authSession = manager.catalogSession(client, awsProperties);
+ assertThat(authSession)
+ .isInstanceOf(RESTSigV4AuthSession.class)
+ .extracting("delegate")
+ .isInstanceOf(OAuth2Util.AuthSession.class);
+ }
+
+ @Test
+ void contextualSession() {
+ AuthManager delegate = Mockito.mock(AuthManager.class);
+ when(delegate.catalogSession(any(), any()))
+ .thenReturn(Mockito.mock(OAuth2Util.AuthSession.class));
+ when(delegate.contextualSession(any(), any()))
+ .thenReturn(Mockito.mock(OAuth2Util.AuthSession.class));
+ AuthManager manager = new RESTSigV4AuthManager("test", delegate);
+ manager.catalogSession(Mockito.mock(HTTPClient.class), awsProperties);
+ AuthSession authSession =
+ manager.contextualSession(
+ Mockito.mock(SessionCatalog.SessionContext.class), Mockito.mock(AuthSession.class));
+ assertThat(authSession)
+ .isInstanceOf(RESTSigV4AuthSession.class)
+ .extracting("delegate")
+ .isInstanceOf(OAuth2Util.AuthSession.class);
+ }
+
+ @Test
+ void tableSession() {
+ AuthManager delegate = Mockito.mock(AuthManager.class);
+ when(delegate.catalogSession(any(), any()))
+ .thenReturn(Mockito.mock(OAuth2Util.AuthSession.class));
+ when(delegate.tableSession(any(), any(), any()))
+ .thenReturn(Mockito.mock(OAuth2Util.AuthSession.class));
+ AuthManager manager = new RESTSigV4AuthManager("test", delegate);
+ manager.catalogSession(Mockito.mock(HTTPClient.class), awsProperties);
+ AuthSession authSession =
+ manager.tableSession(
+ Mockito.mock(TableIdentifier.class), Map.of(), Mockito.mock(AuthSession.class));
+ assertThat(authSession)
+ .isInstanceOf(RESTSigV4AuthSession.class)
+ .extracting("delegate")
+ .isInstanceOf(OAuth2Util.AuthSession.class);
+ }
+
+ @Test
+ void close() {
+ AuthManager delegate = Mockito.mock(AuthManager.class);
+ AuthManager manager = new RESTSigV4AuthManager("test", delegate);
+ manager.close();
+ Mockito.verify(delegate).close();
+ }
+}
diff --git a/aws/src/test/java/org/apache/iceberg/aws/TestRESTSigV4AuthSession.java b/aws/src/test/java/org/apache/iceberg/aws/TestRESTSigV4AuthSession.java
new file mode 100644
index 000000000000..1b2aaf2e1c01
--- /dev/null
+++ b/aws/src/test/java/org/apache/iceberg/aws/TestRESTSigV4AuthSession.java
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
+
+import java.net.URI;
+import java.util.Map;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.rest.HTTPHeaders;
+import org.apache.iceberg.rest.HTTPHeaders.HTTPHeader;
+import org.apache.iceberg.rest.HTTPRequest;
+import org.apache.iceberg.rest.HTTPRequest.HTTPMethod;
+import org.apache.iceberg.rest.ImmutableHTTPRequest;
+import org.apache.iceberg.rest.auth.AuthSession;
+import org.apache.iceberg.rest.requests.CreateNamespaceRequest;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+import software.amazon.awssdk.auth.signer.Aws4Signer;
+
+class TestRESTSigV4AuthSession {
+
+ private final Aws4Signer signer = Aws4Signer.create();
+
+ private final AwsProperties awsProperties =
+ new AwsProperties(
+ Map.of(
+ // CI environment doesn't have credentials, but a value must be set for signing
+ AwsProperties.REST_SIGNER_REGION,
+ "us-west-2",
+ AwsProperties.REST_ACCESS_KEY_ID,
+ "id",
+ AwsProperties.REST_SECRET_ACCESS_KEY,
+ "secret"));
+
+ @Test
+ void nullArguments() {
+ AuthSession delegate = Mockito.mock(AuthSession.class);
+ assertThatThrownBy(() -> new RESTSigV4AuthSession(null, delegate, awsProperties))
+ .isInstanceOf(NullPointerException.class)
+ .hasMessage("Invalid signer: null");
+ assertThatThrownBy(() -> new RESTSigV4AuthSession(signer, null, awsProperties))
+ .isInstanceOf(NullPointerException.class)
+ .hasMessage("Invalid delegate: null");
+ assertThatThrownBy(() -> new RESTSigV4AuthSession(signer, delegate, null))
+ .isInstanceOf(NullPointerException.class)
+ .hasMessage("Invalid AWS properties: null");
+ }
+
+ @Test
+ void authenticateWithoutBody() {
+ HTTPRequest request = Mockito.mock(HTTPRequest.class);
+ AuthSession delegate = Mockito.mock(AuthSession.class);
+ when(delegate.authenticate(any()))
+ .thenReturn(
+ ImmutableHTTPRequest.builder()
+ .method(HTTPMethod.GET)
+ .baseUri(URI.create("http://localhost:8080"))
+ .path("path")
+ .headers(
+ HTTPHeaders.of(
+ HTTPHeader.of("Content-Type", "application/json"),
+ HTTPHeader.of("Content-Encoding", "gzip")))
+ .build());
+ try (RESTSigV4AuthSession session = new RESTSigV4AuthSession(signer, delegate, awsProperties)) {
+ HTTPRequest actual = session.authenticate(request);
+ assertThat(actual.headers().entries())
+ .hasSize(6)
+ // original headers
+ .satisfiesOnlyOnce(
+ header -> {
+ assertThat(header.name()).isEqualTo("Content-Type");
+ assertThat(header.value()).isEqualTo("application/json");
+ })
+ .satisfiesOnlyOnce(
+ header -> {
+ assertThat(header.name()).isEqualTo("Content-Encoding");
+ assertThat(header.value()).isEqualTo("gzip");
+ })
+ // generated by the signer
+ .satisfiesOnlyOnce(
+ header -> {
+ assertThat(header.name()).isEqualTo("Host");
+ assertThat(header.value()).isEqualTo("localhost:8080");
+ })
+ // Sigv4 headers
+ .satisfiesOnlyOnce(
+ header -> {
+ assertThat(header.name()).isEqualTo("Authorization");
+ assertThat(header.value()).startsWith("AWS4-HMAC-SHA256 Credential=");
+ assertThat(header.value())
+ .contains(
+ "SignedHeaders=content-encoding;content-type;host;x-amz-content-sha256;x-amz-date");
+ })
+ .satisfiesOnlyOnce(
+ header -> {
+ assertThat(header.name()).isEqualTo("x-amz-content-sha256");
+ assertThat(header.value())
+ .isEqualTo("e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855");
+ })
+ .satisfiesOnlyOnce(header -> assertThat(header.name()).isEqualTo("X-Amz-Date"));
+ }
+ }
+
+ @Test
+ void authenticateWithBody() {
+ HTTPRequest request = Mockito.mock(HTTPRequest.class);
+ AuthSession delegate = Mockito.mock(AuthSession.class);
+ when(delegate.authenticate(any()))
+ .thenReturn(
+ ImmutableHTTPRequest.builder()
+ .method(HTTPMethod.POST)
+ .baseUri(URI.create("http://localhost:8080"))
+ .path("path")
+ .headers(
+ HTTPHeaders.of(
+ HTTPHeader.of("Content-Type", "application/x-www-form-urlencoded"),
+ HTTPHeader.of("Content-Encoding", "gzip")))
+ .body(CreateNamespaceRequest.builder().withNamespace(Namespace.of("ns")).build())
+ .build());
+ try (RESTSigV4AuthSession session = new RESTSigV4AuthSession(signer, delegate, awsProperties)) {
+ HTTPRequest actual = session.authenticate(request);
+ assertThat(actual.headers().entries())
+ .hasSize(6)
+ // original headers
+ .satisfiesOnlyOnce(
+ header -> {
+ assertThat(header.name()).isEqualTo("Content-Type");
+ assertThat(header.value()).isEqualTo("application/x-www-form-urlencoded");
+ })
+ .satisfiesOnlyOnce(
+ header -> {
+ assertThat(header.name()).isEqualTo("Content-Encoding");
+ assertThat(header.value()).isEqualTo("gzip");
+ })
+ // generated by the signer
+ .satisfiesOnlyOnce(
+ header -> {
+ assertThat(header.name()).isEqualTo("Host");
+ assertThat(header.value()).isEqualTo("localhost:8080");
+ })
+ // Sigv4 headers
+ .satisfiesOnlyOnce(
+ header -> {
+ assertThat(header.name()).isEqualTo("Authorization");
+ assertThat(header.value()).startsWith("AWS4-HMAC-SHA256 Credential=");
+ assertThat(header.value())
+ .contains(
+ "SignedHeaders=content-encoding;content-type;host;x-amz-content-sha256;x-amz-date");
+ })
+ .satisfiesOnlyOnce(
+ header -> {
+ assertThat(header.name()).isEqualTo("x-amz-content-sha256");
+ assertThat(header.value())
+ .isEqualTo("yc5oAKPWjHY4sW8XQq0l/3aNrrXJKBycVFNnDEGMfww=");
+ })
+ .satisfiesOnlyOnce(header -> assertThat(header.name()).isEqualTo("X-Amz-Date"));
+ }
+ }
+
+ @Test
+ void authenticateConflictingAuthorizationHeader() {
+ HTTPRequest request = Mockito.mock(HTTPRequest.class);
+ AuthSession delegate = Mockito.mock(AuthSession.class);
+ when(delegate.authenticate(any()))
+ .thenReturn(
+ ImmutableHTTPRequest.builder()
+ .method(HTTPMethod.GET)
+ .baseUri(URI.create("http://localhost:8080"))
+ .path("path")
+ .headers(
+ HTTPHeaders.of(
+ HTTPHeader.of("Content-Type", "application/json"),
+ HTTPHeader.of("Authorization", "Bearer token")))
+ .build());
+ try (RESTSigV4AuthSession session = new RESTSigV4AuthSession(signer, delegate, awsProperties)) {
+ HTTPRequest actual = session.authenticate(request);
+ assertThat(actual.headers().entries())
+ .hasSize(6)
+ // original header
+ .satisfiesOnlyOnce(
+ header -> {
+ assertThat(header.name()).isEqualTo("Content-Type");
+ assertThat(header.value()).isEqualTo("application/json");
+ })
+ // relocated before signing
+ .satisfiesOnlyOnce(
+ header -> {
+ assertThat(header.name()).isEqualTo("Original-Authorization");
+ assertThat(header.value()).isEqualTo("Bearer token");
+ })
+ // generated by the signer
+ .satisfiesOnlyOnce(
+ header -> {
+ assertThat(header.name()).isEqualTo("Host");
+ assertThat(header.value()).isEqualTo("localhost:8080");
+ })
+ // Sigv4 headers
+ .satisfiesOnlyOnce(
+ header -> {
+ assertThat(header.name()).isEqualTo("Authorization");
+ assertThat(header.value()).startsWith("AWS4-HMAC-SHA256 Credential=");
+ assertThat(header.value())
+ .contains(
+ "SignedHeaders=content-type;host;original-authorization;x-amz-content-sha256;x-amz-date");
+ })
+ .satisfiesOnlyOnce(
+ header -> {
+ assertThat(header.name()).isEqualTo("x-amz-content-sha256");
+ assertThat(header.value())
+ .isEqualTo("e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855");
+ })
+ .satisfiesOnlyOnce(header -> assertThat(header.name()).isEqualTo("X-Amz-Date"));
+ }
+ }
+
+ @Test
+ void authenticateConflictingSigv4Headers() {
+ HTTPRequest request = Mockito.mock(HTTPRequest.class);
+ AuthSession delegate = Mockito.mock(AuthSession.class);
+ when(delegate.authenticate(any()))
+ .thenReturn(
+ ImmutableHTTPRequest.builder()
+ .method(HTTPMethod.GET)
+ .baseUri(URI.create("http://localhost:8080"))
+ .path("path")
+ .headers(
+ HTTPHeaders.of(
+ HTTPHeader.of("Content-Type", "application/json"),
+ HTTPHeader.of("x-amz-content-sha256", "fake"),
+ // corner case: conflicting header with same value as generated by the
+ // signer: will not be relocated
+ HTTPHeader.of(
+ "x-amz-content-sha256",
+ "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"),
+ HTTPHeader.of("X-Amz-Date", "fake")))
+ .build());
+ try (RESTSigV4AuthSession session = new RESTSigV4AuthSession(signer, delegate, awsProperties)) {
+ HTTPRequest actual = session.authenticate(request);
+ assertThat(actual.headers().entries())
+ .hasSize(7)
+ // original header
+ .satisfiesOnlyOnce(
+ header -> {
+ assertThat(header.name()).isEqualTo("Content-Type");
+ assertThat(header.value()).isEqualTo("application/json");
+ })
+ // relocated after signing
+ .satisfiesOnlyOnce(
+ header -> {
+ assertThat(header.name()).isEqualTo("Original-x-amz-content-sha256");
+ assertThat(header.value()).isEqualTo("fake");
+ })
+ .satisfiesOnlyOnce(
+ header -> {
+ assertThat(header.name()).isEqualTo("Original-X-Amz-Date");
+ assertThat(header.value()).isEqualTo("fake");
+ })
+ // generated by the signer
+ .satisfiesOnlyOnce(
+ header -> {
+ assertThat(header.name()).isEqualTo("Host");
+ assertThat(header.value()).isEqualTo("localhost:8080");
+ })
+ // Sigv4 headers
+ .satisfiesOnlyOnce(
+ header -> {
+ assertThat(header.name()).isEqualTo("Authorization");
+ assertThat(header.value()).startsWith("AWS4-HMAC-SHA256 Credential=");
+ assertThat(header.value())
+ .contains("SignedHeaders=content-type;host;x-amz-content-sha256;x-amz-date");
+ })
+ .satisfiesOnlyOnce(
+ header -> {
+ assertThat(header.name()).isEqualTo("x-amz-content-sha256");
+ assertThat(header.value())
+ .isEqualTo("e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855");
+ })
+ .satisfiesOnlyOnce(header -> assertThat(header.name()).isEqualTo("X-Amz-Date"));
+ }
+ }
+
+ @Test
+ void close() {
+ AuthSession delegate = Mockito.mock(AuthSession.class);
+ RESTSigV4AuthSession session = new RESTSigV4AuthSession(signer, delegate, awsProperties);
+ session.close();
+ Mockito.verify(delegate).close();
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/rest/auth/AuthManagers.java b/core/src/main/java/org/apache/iceberg/rest/auth/AuthManagers.java
index 46188c1281c5..331ee5268145 100644
--- a/core/src/main/java/org/apache/iceberg/rest/auth/AuthManagers.java
+++ b/core/src/main/java/org/apache/iceberg/rest/auth/AuthManagers.java
@@ -21,6 +21,9 @@
import java.util.Locale;
import java.util.Map;
import org.apache.iceberg.common.DynConstructors;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.PropertyUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -28,27 +31,59 @@ public class AuthManagers {
private static final Logger LOG = LoggerFactory.getLogger(AuthManagers.class);
+ /** Old property name for enabling SigV4 authentication. */
+ private static final String SIGV4_ENABLED_LEGACY = "rest.sigv4-enabled";
+
private AuthManagers() {}
public static AuthManager loadAuthManager(String name, Map properties) {
- String authType = properties.get(AuthProperties.AUTH_TYPE);
- if (authType == null) {
- boolean hasCredential = properties.containsKey(OAuth2Properties.CREDENTIAL);
- boolean hasToken = properties.containsKey(OAuth2Properties.TOKEN);
- if (hasCredential || hasToken) {
- LOG.warn(
- "Inferring {}={} since property {} was provided. "
- + "Please explicitly set {} to avoid this warning.",
- AuthProperties.AUTH_TYPE,
- AuthProperties.AUTH_TYPE_OAUTH2,
- hasCredential ? OAuth2Properties.CREDENTIAL : OAuth2Properties.TOKEN,
- AuthProperties.AUTH_TYPE);
- authType = AuthProperties.AUTH_TYPE_OAUTH2;
- } else {
- authType = AuthProperties.AUTH_TYPE_NONE;
+ if (properties.containsKey(SIGV4_ENABLED_LEGACY)) {
+ LOG.warn(
+ "The property {} is deprecated and will be removed in a future release. "
+ + "Please use the property {}={} instead.",
+ SIGV4_ENABLED_LEGACY,
+ AuthProperties.AUTH_TYPE,
+ AuthProperties.AUTH_TYPE_SIGV4);
+ }
+
+ String authType;
+ if (PropertyUtil.propertyAsBoolean(properties, SIGV4_ENABLED_LEGACY, false)) {
+ authType = AuthProperties.AUTH_TYPE_SIGV4;
+ } else {
+ authType = properties.get(AuthProperties.AUTH_TYPE);
+ if (authType == null) {
+ boolean hasCredential = properties.containsKey(OAuth2Properties.CREDENTIAL);
+ boolean hasToken = properties.containsKey(OAuth2Properties.TOKEN);
+ if (hasCredential || hasToken) {
+ LOG.warn(
+ "Inferring {}={} since property {} was provided. "
+ + "Please explicitly set {} to avoid this warning.",
+ AuthProperties.AUTH_TYPE,
+ AuthProperties.AUTH_TYPE_OAUTH2,
+ hasCredential ? OAuth2Properties.CREDENTIAL : OAuth2Properties.TOKEN,
+ AuthProperties.AUTH_TYPE);
+ authType = AuthProperties.AUTH_TYPE_OAUTH2;
+ } else {
+ authType = AuthProperties.AUTH_TYPE_NONE;
+ }
}
}
+ AuthManager delegate = null;
+ if (authType.equals(AuthProperties.AUTH_TYPE_SIGV4)) {
+ String delegateAuthType =
+ properties.getOrDefault(
+ AuthProperties.SIGV4_DELEGATE_AUTH_TYPE,
+ AuthProperties.SIGV4_DELEGATE_AUTH_TYPE_DEFAULT);
+ Preconditions.checkArgument(
+ !AuthProperties.AUTH_TYPE_SIGV4.equals(delegateAuthType),
+ "Cannot delegate a SigV4 auth manager to another SigV4 auth manager");
+ Map newProperties = Maps.newHashMap(properties);
+ newProperties.put(AuthProperties.AUTH_TYPE, delegateAuthType);
+ newProperties.remove(SIGV4_ENABLED_LEGACY);
+ delegate = loadAuthManager(name, newProperties);
+ }
+
String impl;
switch (authType.toLowerCase(Locale.ROOT)) {
case AuthProperties.AUTH_TYPE_NONE:
@@ -57,6 +92,9 @@ public static AuthManager loadAuthManager(String name, Map prope
case AuthProperties.AUTH_TYPE_BASIC:
impl = AuthProperties.AUTH_MANAGER_IMPL_BASIC;
break;
+ case AuthProperties.AUTH_TYPE_SIGV4:
+ impl = AuthProperties.AUTH_MANAGER_IMPL_SIGV4;
+ break;
case AuthProperties.AUTH_TYPE_OAUTH2:
impl = AuthProperties.AUTH_MANAGER_IMPL_OAUTH2;
break;
@@ -71,6 +109,7 @@ public static AuthManager loadAuthManager(String name, Map prope
DynConstructors.builder(AuthManager.class)
.loader(AuthManagers.class.getClassLoader())
.impl(impl, String.class) // with name
+ .impl(impl, String.class, AuthManager.class) // with name and delegate
.buildChecked();
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException(
@@ -81,7 +120,7 @@ public static AuthManager loadAuthManager(String name, Map prope
AuthManager authManager;
try {
- authManager = ctor.newInstance(name);
+ authManager = ctor.newInstance(name, delegate);
} catch (ClassCastException e) {
throw new IllegalArgumentException(
String.format("Cannot initialize AuthManager, %s does not implement AuthManager", impl),
diff --git a/core/src/main/java/org/apache/iceberg/rest/auth/AuthProperties.java b/core/src/main/java/org/apache/iceberg/rest/auth/AuthProperties.java
index a4ba2db586a7..65df6493a3e5 100644
--- a/core/src/main/java/org/apache/iceberg/rest/auth/AuthProperties.java
+++ b/core/src/main/java/org/apache/iceberg/rest/auth/AuthProperties.java
@@ -27,6 +27,7 @@ private AuthProperties() {}
public static final String AUTH_TYPE_NONE = "none";
public static final String AUTH_TYPE_BASIC = "basic";
public static final String AUTH_TYPE_OAUTH2 = "oauth2";
+ public static final String AUTH_TYPE_SIGV4 = "sigv4";
public static final String AUTH_MANAGER_IMPL_NONE =
"org.apache.iceberg.rest.auth.NoopAuthManager";
@@ -34,7 +35,12 @@ private AuthProperties() {}
"org.apache.iceberg.rest.auth.BasicAuthManager";
public static final String AUTH_MANAGER_IMPL_OAUTH2 =
"org.apache.iceberg.rest.auth.OAuth2Manager";
+ public static final String AUTH_MANAGER_IMPL_SIGV4 =
+ "org.apache.iceberg.aws.RESTSigV4AuthManager";
public static final String BASIC_USERNAME = "rest.auth.basic.username";
public static final String BASIC_PASSWORD = "rest.auth.basic.password";
+
+ public static final String SIGV4_DELEGATE_AUTH_TYPE = "rest.auth.sigv4.delegate-auth-type";
+ public static final String SIGV4_DELEGATE_AUTH_TYPE_DEFAULT = AUTH_TYPE_OAUTH2;
}