Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1529,6 +1529,18 @@ private Constants() {
*/
public static final String AWS_S3_ACCESSPOINT_REQUIRED = "fs.s3a.accesspoint.required";

/**
* Explicit request for the SDK region resolution.
* Value: {@code}.
*/
public static final String SDK_REGION = "sdk";

/**
* An empty region is the historic fall-through to the SDK.
* Value: ""
*/
public static final String EMPTY_REGION = "";

/**
* Flag for create performance.
* This can be set in the {code createFile()} builder.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,10 @@

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.s3a.impl.AWSClientConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import software.amazon.awssdk.awscore.util.AwsHostNameUtils;
import software.amazon.awssdk.core.checksums.RequestChecksumCalculation;
import software.amazon.awssdk.core.checksums.ResponseChecksumValidation;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
Expand Down Expand Up @@ -57,17 +51,14 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.s3a.impl.AWSClientConfig;
import org.apache.hadoop.fs.s3a.impl.RegionResolution;
import org.apache.hadoop.fs.s3a.statistics.impl.AwsStatisticsCollector;
import org.apache.hadoop.fs.store.LogExactlyOnce;

import static org.apache.hadoop.fs.s3a.Constants.AWS_REGION;
import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_ACCESS_GRANTS_ENABLED;
import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_ACCESS_GRANTS_FALLBACK_TO_IAM_ENABLED;
import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_CROSS_REGION_ACCESS_ENABLED;
import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_CROSS_REGION_ACCESS_ENABLED_DEFAULT;
import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_DEFAULT_REGION;
import static org.apache.hadoop.fs.s3a.Constants.CENTRAL_ENDPOINT;
import static org.apache.hadoop.fs.s3a.Constants.FIPS_ENDPOINT;
import static org.apache.hadoop.fs.s3a.Constants.HTTP_SIGNER_CLASS_NAME;
import static org.apache.hadoop.fs.s3a.Constants.HTTP_SIGNER_ENABLED;
import static org.apache.hadoop.fs.s3a.Constants.HTTP_SIGNER_ENABLED_DEFAULT;
Expand All @@ -77,7 +68,8 @@
import static org.apache.hadoop.fs.s3a.auth.SignerFactory.createHttpSigner;
import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.REQUESTER_PAYS_HEADER;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.AUTH_SCHEME_AWS_SIGV_4;
import static org.apache.hadoop.util.Preconditions.checkArgument;
import static org.apache.hadoop.fs.s3a.impl.RegionResolution.RegionResolutionMechanism.Sdk;
import static org.apache.hadoop.fs.s3a.impl.RegionResolution.calculateRegion;


/**
Expand All @@ -92,40 +84,17 @@ public class DefaultS3ClientFactory extends Configured

private static final String REQUESTER_PAYS_HEADER_VALUE = "requester";

private static final String S3_SERVICE_NAME = "s3";

private static final Pattern VPC_ENDPOINT_PATTERN =
Pattern.compile("^(?:.+\\.)?([a-z0-9-]+)\\.vpce\\.amazonaws\\.(?:com|com\\.cn)$");

/**
* Subclasses refer to this.
*/
protected static final Logger LOG =
LoggerFactory.getLogger(DefaultS3ClientFactory.class);

/**
* A one-off warning of default region chains in use.
*/
private static final LogExactlyOnce WARN_OF_DEFAULT_REGION_CHAIN =
new LogExactlyOnce(LOG);

/**
* Warning message printed when the SDK Region chain is in use.
* Message printed when the SDK Region chain is in use.
*/
private static final String SDK_REGION_CHAIN_IN_USE =
"S3A filesystem client is using"
+ " the SDK region resolution chain.";


/** Exactly once log to inform about ignoring the AWS-SDK Warnings for CSE. */
private static final LogExactlyOnce IGNORE_CSE_WARN = new LogExactlyOnce(LOG);

/**
* Error message when an endpoint is set with FIPS enabled: {@value}.
*/
@VisibleForTesting
public static final String ERROR_ENDPOINT_WITH_FIPS =
"Non central endpoint cannot be set when " + FIPS_ENDPOINT + " is true";
"S3A filesystem client is using the SDK region resolution chain.";

/**
* A one-off log stating whether S3 Access Grants are enabled.
Expand Down Expand Up @@ -307,174 +276,58 @@ protected ClientOverrideConfiguration.Builder createClientOverrideConfiguration(

/**
* This method configures the endpoint and region for a S3 client.
* The order of configuration is:
*
* <ol>
* <li>If region is configured via fs.s3a.endpoint.region, use it.</li>
* <li>If endpoint is configured via via fs.s3a.endpoint, set it.
* If no region is configured, try to parse region from endpoint. </li>
* <li> If no region is configured, and it could not be parsed from the endpoint,
* set the default region as US_EAST_2</li>
* <li> If configured region is empty, fallback to SDK resolution chain. </li>
* <li> S3 cross region is enabled by default irrespective of region or endpoint
* is set or not.</li>
* </ol>
*
* See {@link RegionResolution} for the details.
* @param builder S3 client builder.
* @param parameters parameter object
* @param conf conf configuration object
* @param conf conf configuration object
* @param <BuilderT> S3 client builder type
* @param <ClientT> S3 client type
* @return how the region was resolved.
* @throws IllegalArgumentException if endpoint is set when FIPS is enabled.
*/
private <BuilderT extends S3BaseClientBuilder<BuilderT, ClientT>, ClientT> void configureEndpointAndRegion(
BuilderT builder, S3ClientCreationParameters parameters, Configuration conf) {
final String endpointStr = parameters.getEndpoint();
final URI endpoint = getS3Endpoint(endpointStr, conf);

final String configuredRegion = parameters.getRegion();
Region region = null;
String origin = "";

// If the region was configured, set it.
if (configuredRegion != null && !configuredRegion.isEmpty()) {
origin = AWS_REGION;
region = Region.of(configuredRegion);
}
private <BuilderT extends S3BaseClientBuilder<BuilderT, ClientT>, ClientT> RegionResolution.Resolution
configureEndpointAndRegion(BuilderT builder,
S3ClientCreationParameters parameters,
Configuration conf) throws IOException {

// FIPs? Log it, then reject any attempt to set an endpoint
final boolean fipsEnabled = parameters.isFipsEnabled();
if (fipsEnabled) {
LOG.debug("Enabling FIPS mode");
}
// always setting it guarantees the value is non-null,
final RegionResolution.Resolution resolution =
calculateRegion(parameters, conf);
LOG.debug("Region Resolution: {}", resolution);

// always setting to true or false guarantees the value is non-null,
// which tests expect.
builder.fipsEnabled(fipsEnabled);

if (endpoint != null) {
boolean endpointEndsWithCentral =
endpointStr.endsWith(CENTRAL_ENDPOINT);
checkArgument(!fipsEnabled || endpointEndsWithCentral, "%s : %s",
ERROR_ENDPOINT_WITH_FIPS,
endpoint);

// No region was configured,
// determine the region from the endpoint.
if (region == null) {
region = getS3RegionFromEndpoint(endpointStr,
endpointEndsWithCentral);
if (region != null) {
origin = "endpoint";
}
}
builder.fipsEnabled(resolution.isUseFips());

// No need to override endpoint with "s3.amazonaws.com".
// Let the client take care of endpoint resolution. Overriding
// the endpoint with "s3.amazonaws.com" causes 400 Bad Request
// errors for non-existent buckets and objects.
// ref: https://github.com/aws/aws-sdk-java-v2/issues/4846
if (!endpointEndsWithCentral) {
builder.endpointOverride(endpoint);
LOG.debug("Setting endpoint to {}", endpoint);
} else {
origin = "central endpoint with cross region access";
LOG.debug("Enabling cross region access for endpoint {}",
endpointStr);
}
}
if (Sdk != resolution.getMechanism()) {

if (region != null) {
builder.region(region);
} else if (configuredRegion == null) {
// no region is configured, and none could be determined from the endpoint.
// Use US_EAST_2 as default.
region = Region.of(AWS_S3_DEFAULT_REGION);
builder.region(region);
origin = "cross region access fallback";
} else if (configuredRegion.isEmpty()) {
// region configuration was set to empty string.
// allow this if people really want it; it is OK to rely on this
// when deployed in EC2.
WARN_OF_DEFAULT_REGION_CHAIN.warn(SDK_REGION_CHAIN_IN_USE);
LOG.debug(SDK_REGION_CHAIN_IN_USE);
origin = "SDK region chain";
}
boolean isCrossRegionAccessEnabled = conf.getBoolean(AWS_S3_CROSS_REGION_ACCESS_ENABLED,
AWS_S3_CROSS_REGION_ACCESS_ENABLED_DEFAULT);
// s3 cross region access
if (isCrossRegionAccessEnabled) {
builder.crossRegionAccessEnabled(true);
// a region has been determined from configuration,
// or it is falling back to central region.

final Region region = resolution.getRegion();
builder.region(requireNonNull(region));
// s3 cross region access
if (resolution.isCrossRegionAccessEnabled()) {
builder.crossRegionAccessEnabled(true);
}
final URI endpointUri = resolution.getEndpointUri();
if (endpointUri != null && !resolution.isUseCentralEndpoint()) {
LOG.debug("Setting endpoint to {}", endpointUri);
builder.endpointOverride(endpointUri);
}
}
LOG.debug("Setting region to {} from {} with cross region access {}",
region, origin, isCrossRegionAccessEnabled);
return resolution;
}

/**
* Given a endpoint string, create the endpoint URI.
*
* <p>Kept in as subclasses use it.
* @param endpoint possibly null endpoint.
* @param conf config to build the URI from.
* @return an endpoint uri
*/
protected static URI getS3Endpoint(String endpoint, final Configuration conf) {

boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS, DEFAULT_SECURE_CONNECTIONS);

String protocol = secureConnections ? "https" : "http";

if (endpoint == null || endpoint.isEmpty()) {
// don't set an endpoint if none is configured, instead let the SDK figure it out.
return null;
}

if (!endpoint.contains("://")) {
endpoint = String.format("%s://%s", protocol, endpoint);
}

try {
return new URI(endpoint);
} catch (URISyntaxException e) {
throw new IllegalArgumentException(e);
}
}

/**
* Parses the endpoint to get the region.
* If endpoint is the central one, use US_EAST_2.
*
* @param endpoint the configure endpoint.
* @param endpointEndsWithCentral true if the endpoint is configured as central.
* @return the S3 region, null if unable to resolve from endpoint.
*/
@VisibleForTesting
static Region getS3RegionFromEndpoint(final String endpoint,
final boolean endpointEndsWithCentral) {

if (!endpointEndsWithCentral) {
// S3 VPC endpoint parsing
Matcher matcher = VPC_ENDPOINT_PATTERN.matcher(endpoint);
if (matcher.find()) {
LOG.debug("Mapping to VPCE");
LOG.debug("Endpoint {} is vpc endpoint; parsing region as {}", endpoint, matcher.group(1));
return Region.of(matcher.group(1));
}

LOG.debug("Endpoint {} is not the default; parsing", endpoint);
return AwsHostNameUtils.parseSigningRegion(endpoint, S3_SERVICE_NAME).orElse(null);
}

// Select default region here to enable cross-region access.
// If both "fs.s3a.endpoint" and "fs.s3a.endpoint.region" are empty,
// Spark sets "fs.s3a.endpoint" to "s3.amazonaws.com".
// This applies to Spark versions with the changes of SPARK-35878.
// ref:
// https://github.com/apache/spark/blob/v3.5.0/core/
// src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala#L528
// If we do not allow cross region access, Spark would not be able to
// access any bucket that is not present in the given region.
// Hence, we should use default region us-east-2 to allow cross-region
// access.
return Region.of(AWS_S3_DEFAULT_REGION);
return RegionResolution.buildEndpointUri(endpoint, secureConnections);
}

private static <BuilderT extends S3BaseClientBuilder<BuilderT, ClientT>, ClientT> void
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Locale;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -99,13 +100,14 @@ public static void bindSSLChannelMode(Configuration conf,

/**
* Is this an AWS endpoint? looks at end of FQDN.
* @param endpoint endpoint
* @return true if the endpoint matches the requirements for an aws endpoint.
* @param endpoint endpoint.
* @return true iff this is non-empty or ends with amazonaws.com or amazonaws.com.cn
*/
public static boolean isAwsEndpoint(final String endpoint) {
final String host = endpoint.toLowerCase(Locale.ROOT);
return (endpoint.isEmpty()
|| endpoint.endsWith(".amazonaws.com")
|| endpoint.endsWith(".amazonaws.com.cn"));
|| host.endsWith(".amazonaws.com")
|| host.endsWith(".amazonaws.com.cn"));
}

/**
Expand Down
Loading
Loading