Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HADOOP-18562]: S3A: support custom S3 and STS headers. #6550

Open
wants to merge 12 commits into
base: trunk
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -775,6 +775,29 @@ private Constants() {
"fs.s3a." + Constants.AWS_SERVICE_IDENTIFIER_STS.toLowerCase()
+ ".signing-algorithm";

/** Prefix for S3A client-specific properties. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add an {@value} tag for this and the strings below, for better IDE experience

public static final String FS_S3A_CLIENT_PREFIX = "fs.s3a.client.";

/** Custom headers postfix */
public static final String CUSTOM_HEADERS_POSTFIX = ".custom.headers";

/**
* List of custom headers to be set on the service client.
* Multiple parameters can be used to specify custom headers.
* fs.s3a.client.s3.custom.headers - headers to add on all the s3 requests.
* fs.s3a.client.sts.custom.headers - headers to add on all the sts requests.
* Examples
* CustomHeader {@literal ->} 'Header1:Value1'
* CustomHeaders {@literal ->} 'Header1=Value1:Value2,Header2=Value1'
*/
public static final String CUSTOM_HEADERS_STS =
FS_S3A_CLIENT_PREFIX + Constants.AWS_SERVICE_IDENTIFIER_STS.toLowerCase()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. no Constants reference needed here or below
  2. Use toLowerCase(Locale. ROOT) unless you want to field support calls from unusual locales

+ CUSTOM_HEADERS_POSTFIX;

public static final String CUSTOM_HEADERS_S3 =
FS_S3A_CLIENT_PREFIX + Constants.AWS_SERVICE_IDENTIFIER_S3.toLowerCase()
+ CUSTOM_HEADERS_POSTFIX;

public static final String S3N_FOLDER_SUFFIX = "_$folder$";
public static final String FS_S3A_BLOCK_SIZE = "fs.s3a.block.size";
public static final String FS_S3A = "s3a";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
Expand Down Expand Up @@ -72,6 +75,8 @@
import static org.apache.hadoop.fs.s3a.Constants.SIGNING_ALGORITHM_STS;
import static org.apache.hadoop.fs.s3a.Constants.SOCKET_TIMEOUT;
import static org.apache.hadoop.fs.s3a.Constants.USER_AGENT_PREFIX;
import static org.apache.hadoop.fs.s3a.Constants.CUSTOM_HEADERS_S3;
import static org.apache.hadoop.fs.s3a.Constants.CUSTOM_HEADERS_STS;
import static org.apache.hadoop.fs.s3a.impl.ConfigurationHelper.enforceMinimumDuration;
import static org.apache.hadoop.fs.s3a.impl.ConfigurationHelper.getDuration;
import static org.apache.hadoop.util.Preconditions.checkArgument;
Expand Down Expand Up @@ -116,6 +121,8 @@ public static ClientOverrideConfiguration.Builder createClientConfigBuilder(Conf

initUserAgent(conf, overrideConfigBuilder);

initRequestHeaders(conf, overrideConfigBuilder, awsServiceIdentifier);

String signer = conf.getTrimmed(SIGNING_ALGORITHM, "");
if (!signer.isEmpty()) {
LOG.debug("Signer override = {}", signer);
Expand Down Expand Up @@ -407,6 +414,36 @@ private static void initSigner(Configuration conf,
}
}

/**
*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need a title here

* @param conf hadoop configuration
* @param clientConfig client configuration to update
* @param awsServiceIdentifier service name
*/
private static void initRequestHeaders(Configuration conf,
ClientOverrideConfiguration.Builder clientConfig, String awsServiceIdentifier) {
String configKey = null;
switch (awsServiceIdentifier) {
case AWS_SERVICE_IDENTIFIER_S3:
configKey = CUSTOM_HEADERS_S3;
break;
case AWS_SERVICE_IDENTIFIER_STS:
configKey = CUSTOM_HEADERS_STS;
break;
default:
// Nothing to do. The original signer override is already setup
}
if (configKey != null) {
Map<String, String> awsClientCustomHeadersMap =
S3AUtils.getTrimmedStringCollectionSplitByEquals(conf, configKey);
awsClientCustomHeadersMap.forEach((header, valueString) -> {
List<String> headerValues = Arrays.asList(valueString.split(":"));
steveloughran marked this conversation as resolved.
Show resolved Hide resolved
clientConfig.putHeader(header, headerValues);
});
LOG.debug("headers for {} client = {}", awsServiceIdentifier, clientConfig.headers());
}
}

/**
* Configures request timeout in the client configuration.
* This is independent of the timeouts set in the sync and async HTTP clients;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.fs.s3a.impl;

import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;

Expand All @@ -29,11 +30,16 @@

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.test.AbstractHadoopTestBase;
import org.apache.hadoop.util.Lists;

import static org.apache.hadoop.fs.s3a.Constants.AWS_SERVICE_IDENTIFIER_S3;
import static org.apache.hadoop.fs.s3a.Constants.AWS_SERVICE_IDENTIFIER_STS;
import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_ACQUISITION_TIMEOUT;
import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_IDLE_TIME;
import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_KEEPALIVE;
import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_TTL;
import static org.apache.hadoop.fs.s3a.Constants.CUSTOM_HEADERS_S3;
import static org.apache.hadoop.fs.s3a.Constants.CUSTOM_HEADERS_STS;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_CONNECTION_ACQUISITION_TIMEOUT_DURATION;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_CONNECTION_IDLE_TIME_DURATION;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_CONNECTION_KEEPALIVE;
Expand All @@ -48,6 +54,7 @@
import static org.apache.hadoop.fs.s3a.Constants.REQUEST_TIMEOUT;
import static org.apache.hadoop.fs.s3a.Constants.SOCKET_TIMEOUT;
import static org.apache.hadoop.fs.s3a.impl.AWSClientConfig.createApiConnectionSettings;
import static org.apache.hadoop.fs.s3a.impl.AWSClientConfig.createClientConfigBuilder;
import static org.apache.hadoop.fs.s3a.impl.AWSClientConfig.createConnectionSettings;
import static org.apache.hadoop.fs.s3a.impl.ConfigurationHelper.enforceMinimumDuration;

Expand Down Expand Up @@ -201,4 +208,56 @@ public void testCreateApiConnectionSettingsDefault() {
private void setOptionsToValue(String value, Configuration conf, String... keys) {
Arrays.stream(keys).forEach(key -> conf.set(key, value));
}

/**
* if {@link org.apache.hadoop.fs.s3a.Constants#CUSTOM_HEADERS_STS} is set,
* verify that returned client configuration has desired headers set.
*/
@Test
public void testInitRequestHeadersForSTS() throws IOException {
final Configuration conf = new Configuration();
steveloughran marked this conversation as resolved.
Show resolved Hide resolved
conf.set(CUSTOM_HEADERS_STS, "foo=bar:baz,qux=quux");
Assertions.assertThat(conf.get(CUSTOM_HEADERS_S3))
.describedAs("Custom client headers for s3 %s", CUSTOM_HEADERS_S3)
.isNull();

Assertions.assertThat(createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_S3).headers().size())
.describedAs("Count of S3 client headers")
.isEqualTo(0);
Assertions.assertThat(createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_STS).headers().size())
.describedAs("Count of STS client headers")
.isEqualTo(2);
Assertions.assertThat(createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_STS).headers().get("foo"))
.describedAs("STS client 'foo' header value")
.isEqualTo(Lists.newArrayList("bar", "baz"));
Assertions.assertThat(createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_STS).headers().get("qux"))
.describedAs("STS client 'qux' header value")
.isEqualTo(Lists.newArrayList("quux"));
}

/**
* if {@link org.apache.hadoop.fs.s3a.Constants#CUSTOM_HEADERS_S3} is set,
* verify that returned client configuration has desired headers set.
*/
@Test
public void testInitRequestHeadersForS3() throws IOException {
final Configuration conf = new Configuration();
conf.set(CUSTOM_HEADERS_S3, "foo=bar:baz,qux=quux");
Assertions.assertThat(conf.get(CUSTOM_HEADERS_STS))
.describedAs("Custom client headers for STS %s", CUSTOM_HEADERS_STS)
.isNull();

Assertions.assertThat(createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_STS).headers().size())
.describedAs("Count of STS client headers")
.isEqualTo(0);
Assertions.assertThat(createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_S3).headers().size())
.describedAs("Count of S3 client headers")
.isEqualTo(2);
Assertions.assertThat(createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_S3).headers().get("foo"))
.describedAs("S3 client 'foo' header value")
.isEqualTo(Lists.newArrayList("bar", "baz"));
Assertions.assertThat(createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_S3).headers().get("qux"))
.describedAs("S3 client 'qux' header value")
.isEqualTo(Lists.newArrayList("quux"));
}
}
Loading