Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions aws/src/main/java/org/apache/iceberg/aws/RESTSigV4AuthManager.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.aws;

import java.util.Map;
import org.apache.iceberg.rest.RESTClient;
import org.apache.iceberg.rest.auth.OAuth2Manager;
import org.apache.iceberg.rest.auth.OAuth2Util;

/**
* An AuthManager that authenticates requests with SigV4.
*
* <p>It extends {@link OAuth2Manager} to handle OAuth2 authentication as well. In case of
* conflicting headers, the OAuth2 Authorization header will be relocated, then included in the
* canonical headers to sign.
*
* <p>See <a href="https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_sigv.html">Signing AWS
* API requests</a> for details about the SigV4 protocol.
*/
@SuppressWarnings("unused") // loaded by reflection
public class RESTSigV4AuthManager extends OAuth2Manager {

private RESTSigV4Signer signer;

public RESTSigV4AuthManager(String name) {
super(name);
}

@Override
public RESTSigv4AuthSession initSession(RESTClient initClient, Map<String, String> properties) {
RESTSigV4Signer initSigner = new RESTSigV4Signer(properties);
return new RESTSigv4AuthSession(super.initSession(initClient, properties), initSigner);
}

@Override
public RESTSigv4AuthSession catalogSession(
RESTClient sharedClient, Map<String, String> properties) {
signer = new RESTSigV4Signer(properties);
return new RESTSigv4AuthSession(super.catalogSession(sharedClient, properties), signer);
}

@Override
protected RESTSigv4AuthSession newSessionFromAccessToken(
String token, Map<String, String> properties, OAuth2Util.AuthSession parent) {
return new RESTSigv4AuthSession(
super.newSessionFromAccessToken(token, properties, parent), signer);
}

@Override
protected RESTSigv4AuthSession newSessionFromCredential(
String credential, OAuth2Util.AuthSession parent) {
return new RESTSigv4AuthSession(super.newSessionFromCredential(credential, parent), signer);
}

@Override
protected RESTSigv4AuthSession newSessionFromTokenExchange(
String token, String tokenType, OAuth2Util.AuthSession parent) {
return new RESTSigv4AuthSession(
super.newSessionFromTokenExchange(token, tokenType, parent), signer);
}
}
143 changes: 72 additions & 71 deletions aws/src/main/java/org/apache/iceberg/aws/RESTSigV4Signer.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,16 @@
*/
package org.apache.iceberg.aws;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.hc.core5.http.EntityDetails;
import org.apache.hc.core5.http.Header;
import org.apache.commons.io.IOUtils;
import org.apache.hc.core5.http.HttpHeaders;
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.HttpRequestInterceptor;
import org.apache.hc.core5.http.io.entity.StringEntity;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.iceberg.exceptions.RESTException;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.rest.HTTPRequest;
import org.apache.iceberg.rest.ImmutableHTTPRequest;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.signer.Aws4Signer;
import software.amazon.awssdk.auth.signer.internal.SignerConstant;
Expand All @@ -45,42 +39,32 @@
import software.amazon.awssdk.regions.Region;

/**
* Provides a request interceptor for use with the HTTPClient that calculates the required signature
* for the SigV4 protocol and adds the necessary headers for all requests created by the client.
* A SigV4 signer that calculates the required signature for the SigV4 protocol and adds the
* necessary headers for all requests created by the client.
*
* <p>See <a
* href="https://docs.aws.amazon.com/general/latest/gr/signing-aws-api-requests.html">Signing AWS
* <p>See <a href="https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_sigv.html">Signing AWS
* API requests</a> for details about the protocol.
*/
public class RESTSigV4Signer implements HttpRequestInterceptor {
public class RESTSigV4Signer {
static final String EMPTY_BODY_SHA256 =
"e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855";
static final String RELOCATED_HEADER_PREFIX = "Original-";

private final Aws4Signer signer = Aws4Signer.create();
private AwsCredentialsProvider credentialsProvider;
private final AwsCredentialsProvider credentialsProvider;

private String signingName;
private Region signingRegion;
private final String signingName;
private final Region signingRegion;

public void initialize(Map<String, String> properties) {
public RESTSigV4Signer(Map<String, String> properties) {
AwsProperties awsProperties = new AwsProperties(properties);

this.signingRegion = awsProperties.restSigningRegion();
this.signingName = awsProperties.restSigningName();
this.credentialsProvider = awsProperties.restCredentialsProvider();
}

@Override
public void process(HttpRequest request, EntityDetails entity, HttpContext context) {
URI requestUri;

try {
requestUri = request.getUri();
} catch (URISyntaxException e) {
throw new RESTException(e, "Invalid uri for request: %s", request);
}

public HTTPRequest sign(HTTPRequest request) {
Aws4SignerParams params =
Aws4SignerParams.builder()
.signingName(signingName)
Expand All @@ -96,62 +80,79 @@ public void process(HttpRequest request, EntityDetails entity, HttpContext conte

SdkHttpFullRequest.Builder sdkRequestBuilder = SdkHttpFullRequest.builder();

URI uri = request.requestUri();
sdkRequestBuilder
.method(SdkHttpMethod.fromValue(request.getMethod()))
.protocol(request.getScheme())
.uri(requestUri)
.headers(convertHeaders(request.getHeaders()));
.method(SdkHttpMethod.fromValue(request.method().name()))
.protocol(uri.getScheme())
.uri(uri)
.headers(convertHeaders(request.headers()));

if (entity == null) {
String body = request.encodedBody();
if (body == null) {
// This is a workaround for the signer implementation incorrectly producing
// an invalid content checksum for empty body requests.
sdkRequestBuilder.putHeader(SignerConstant.X_AMZ_CONTENT_SHA256, EMPTY_BODY_SHA256);
} else if (entity instanceof StringEntity) {
sdkRequestBuilder.contentStreamProvider(
() -> {
try {
return ((StringEntity) entity).getContent();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});
} else {
throw new UnsupportedOperationException("Unsupported entity type: " + entity.getClass());
sdkRequestBuilder.contentStreamProvider(
() -> IOUtils.toInputStream(body, StandardCharsets.UTF_8));
}

SdkHttpFullRequest signedSdkRequest = signer.sign(sdkRequestBuilder.build(), params);
updateRequestHeaders(request, signedSdkRequest.headers());
Map<String, List<String>> newHeaders =
updateRequestHeaders(request, signedSdkRequest.headers());
return ImmutableHTTPRequest.builder().from(request).headers(newHeaders).build();
}

private Map<String, List<String>> convertHeaders(Header[] headers) {
return Arrays.stream(headers)
.collect(
Collectors.groupingBy(
// Relocate Authorization header as SigV4 takes precedence
header ->
HttpHeaders.AUTHORIZATION.equals(header.getName())
? RELOCATED_HEADER_PREFIX + header.getName()
: header.getName(),
Collectors.mapping(Header::getValue, Collectors.toList())));
}

private void updateRequestHeaders(HttpRequest request, Map<String, List<String>> headers) {
private Map<String, List<String>> convertHeaders(Map<String, List<String>> headers) {
Map<String, List<String>> converted = Maps.newHashMap();
headers.forEach(
(name, values) -> {
if (name.equals(HttpHeaders.AUTHORIZATION)) {
converted.merge(
RELOCATED_HEADER_PREFIX + name,
values,
(v1, v2) -> {
List<String> merged = Lists.newArrayList(v1);
merged.addAll(v2);
return List.copyOf(merged);
});
} else {
converted.put(name, values);
}
});
return converted;
}

private Map<String, List<String>> updateRequestHeaders(
HTTPRequest request, Map<String, List<String>> signedHeaders) {
Map<String, List<String>> newHeaders = Maps.newLinkedHashMap();
newHeaders.putAll(request.headers());
signedHeaders.forEach(
(name, signedValues) -> {
if (request.containsHeader(name)) {
Header[] original = request.getHeaders(name);
request.removeHeaders(name);
Arrays.asList(original)
.forEach(
header -> {
// Relocate headers if there is a conflict with signed headers
if (!values.contains(header.getValue())) {
request.addHeader(RELOCATED_HEADER_PREFIX + name, header.getValue());
}
});
List<String> originalValues = request.headers(name);
newHeaders.remove(name);
originalValues.forEach(
originalValue -> {
// Relocate headers if there is a conflict with signed headers
if (!signedValues.contains(originalValue)) {
newHeaders.compute(
RELOCATED_HEADER_PREFIX + name,
(k, v) -> {
if (v == null) {
return List.of(originalValue);
} else {
List<String> merged = Lists.newArrayList(v);
merged.add(originalValue);
return List.copyOf(merged);
}
});
}
});
}

values.forEach(value -> request.setHeader(name, value));
newHeaders.put(name, signedValues);
});
return newHeaders;
}
}
42 changes: 42 additions & 0 deletions aws/src/main/java/org/apache/iceberg/aws/RESTSigv4AuthSession.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.aws;

import org.apache.iceberg.rest.HTTPRequest;
import org.apache.iceberg.rest.auth.OAuth2Util;

/**
* An AuthSession that signs requests with SigV4.
*
* <p>It extends {@link OAuth2Util.AuthSession} to handle OAuth2 authentication as well.
*/
public class RESTSigv4AuthSession extends OAuth2Util.AuthSession {

private final RESTSigV4Signer signer;

public RESTSigv4AuthSession(OAuth2Util.AuthSession authSession, RESTSigV4Signer signer) {
super(authSession.headers(), authSession.config());
this.signer = signer;
}

@Override
public HTTPRequest authenticate(HTTPRequest request) {
return signer.sign(super.authenticate(request));
}
}
Loading