Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion LICENSE-binary
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ org.xerial.snappy:snappy-java:1.1.10.4
org.yaml:snakeyaml:2.0
org.wildfly.openssl:wildfly-openssl:2.1.4.Final
ro.isdc.wro4j:wro4j-maven-plugin:1.8.0
software.amazon.awssdk:bundle:2.25.53
software.amazon.awssdk:bundle:2.30.27
net.jodah:failsafe:2.4.4

--------------------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion hadoop-project/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@
<make-maven-plugin.version>1.0-beta-1</make-maven-plugin.version>
<surefire.fork.timeout>900</surefire.fork.timeout>
<aws-java-sdk.version>1.12.720</aws-java-sdk.version>
<aws-java-sdk-v2.version>2.25.53</aws-java-sdk-v2.version>
<aws-java-sdk-v2.version>2.30.27</aws-java-sdk-v2.version>
<amazon-s3-encryption-client-java.version>3.1.1</amazon-s3-encryption-client-java.version>
<amazon-s3-analyticsaccelerator-s3.version>0.0.4</amazon-s3-analyticsaccelerator-s3.version>
<aws.eventstream.version>1.0.1</aws.eventstream.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.slf4j.event.Level;
import software.amazon.awssdk.awscore.util.AwsHostNameUtils;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
Expand All @@ -38,6 +39,7 @@
import software.amazon.awssdk.http.auth.spi.scheme.AuthScheme;
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
import software.amazon.awssdk.identity.spi.AwsCredentialsIdentity;
import software.amazon.awssdk.metrics.LoggingMetricPublisher;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.s3accessgrants.plugin.S3AccessGrantsPlugin;
import software.amazon.awssdk.services.s3.S3AsyncClient;
Expand All @@ -53,6 +55,7 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.s3a.impl.MD5RequiredOperationInterceptor;
import org.apache.hadoop.fs.s3a.statistics.impl.AwsStatisticsCollector;
import org.apache.hadoop.fs.store.LogExactlyOnce;

Expand Down Expand Up @@ -211,12 +214,25 @@ private <BuilderT extends S3BaseClientBuilder<BuilderT, ClientT>, ClientT> Build
final ClientOverrideConfiguration.Builder override =
createClientOverrideConfiguration(parameters, conf);

S3BaseClientBuilder s3BaseClientBuilder = builder
S3BaseClientBuilder<BuilderT, ClientT> s3BaseClientBuilder = builder
.overrideConfiguration(override.build())
.credentialsProvider(parameters.getCredentialSet())
.disableS3ExpressSessionAuth(!parameters.isExpressCreateSession())
.serviceConfiguration(serviceConfiguration);

if (LOG.isTraceEnabled()) {
// if this log is set to debug then we turn on logging of SDK metrics.
// The metrics itself will log at info; it is just that reflection work
// would be needed to change that setting safely for shaded and unshaded aws artifacts.
s3BaseClientBuilder.overrideConfiguration(o ->
o.addMetricPublisher(LoggingMetricPublisher.create()));
}

// Force adding MD5 checksums to requests which need it now that SDK doesn't.
s3BaseClientBuilder.overrideConfiguration(o ->
override.addExecutionInterceptor(new MD5RequiredOperationInterceptor()));


if (conf.getBoolean(HTTP_SIGNER_ENABLED, HTTP_SIGNER_ENABLED_DEFAULT)) {
// use an http signer through an AuthScheme
final AuthScheme<AwsCredentialsIdentity> signer =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.Closeable;
import java.io.IOException;
import java.time.Duration;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -62,6 +63,12 @@ public class IAMInstanceCredentialsProvider
private static final Logger LOG =
LoggerFactory.getLogger(IAMInstanceCredentialsProvider.class);

/**
* How far in advance of credential expiry must IAM credentials be refreshed.
* See HADOOP-19181. S3A: IAMCredentialsProvider throttling results in AWS auth failures
*/
public static final Duration TIME_BEFORE_EXPIRY = Duration.ofMinutes(1);

/**
* The credentials provider.
* Initially a container credentials provider, but if that fails
Expand Down Expand Up @@ -130,8 +137,12 @@ private synchronized AwsCredentials getCredentials() {
// close it to shut down any thread
iamCredentialsProvider.close();
isContainerCredentialsProvider = false;

// create an async credentials provider with a safe credential
// expiry time.
iamCredentialsProvider = InstanceProfileCredentialsProvider.builder()
.asyncCredentialUpdateEnabled(true)
.staleTime(TIME_BEFORE_EXPIRY)
.build();
return iamCredentialsProvider.resolveCredentials();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ private AwsSdkWorkarounds() {
* @return true if the log tuning operation took place.
*/
public static boolean prepareLogging() {
return LogControllerFactory.createController().
setLogLevel(TRANSFER_MANAGER, LogControl.LogLevel.ERROR);
return true;
}

/**
Expand All @@ -53,7 +52,6 @@ public static boolean prepareLogging() {
*/
@VisibleForTesting
static boolean restoreNoisyLogging() {
return LogControllerFactory.createController().
setLogLevel(TRANSFER_MANAGER, LogControl.LogLevel.INFO);
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/


package org.apache.hadoop.fs.s3a.impl;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Optional;

import software.amazon.awssdk.checksums.DefaultChecksumAlgorithm;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.checksums.ChecksumSpecs;
import software.amazon.awssdk.core.interceptor.Context;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.core.interceptor.ExecutionInterceptor;
import software.amazon.awssdk.core.interceptor.SdkExecutionAttribute;
import software.amazon.awssdk.core.interceptor.SdkInternalExecutionAttribute;
import software.amazon.awssdk.core.interceptor.trait.HttpChecksum;
import software.amazon.awssdk.core.internal.util.HttpChecksumUtils;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.http.Header;
import software.amazon.awssdk.http.SdkHttpRequest;
import software.amazon.awssdk.utils.Md5Utils;


/**
* Taken from AWS engineer discussion on how to address incompatible changes
* in SDKs.
* @see https://github.com/aws/aws-sdk-java-v2/discussions/5802
*/

public final class MD5RequiredOperationInterceptor implements ExecutionInterceptor {

@Override
public SdkHttpRequest modifyHttpRequest(Context.ModifyHttpRequest context,
ExecutionAttributes executionAttributes) {
boolean isHttpChecksumRequired = isHttpChecksumRequired(executionAttributes);
boolean requestAlreadyHasMd5 =
context.httpRequest().firstMatchingHeader(Header.CONTENT_MD5).isPresent();

Optional<RequestBody> syncContent = context.requestBody();
Optional<AsyncRequestBody> asyncContent = context.asyncRequestBody();

if (!isHttpChecksumRequired || requestAlreadyHasMd5) {
return context.httpRequest();
}

if (asyncContent.isPresent()) {
throw new IllegalStateException("This operation requires a content-MD5 checksum, " +
"but one cannot be calculated for non-blocking content.");
}

if (syncContent.isPresent()) {
try {
String payloadMd5 =
Md5Utils.md5AsBase64(syncContent.get().contentStreamProvider().newStream());
return context.httpRequest().copy(r -> r.putHeader(Header.CONTENT_MD5, payloadMd5));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
return context.httpRequest();
}

private boolean isHttpChecksumRequired(ExecutionAttributes executionAttributes) {
return executionAttributes.getAttribute(SdkInternalExecutionAttribute.HTTP_CHECKSUM_REQUIRED)
!= null
|| isMd5ChecksumRequired(executionAttributes);
}

public static boolean isMd5ChecksumRequired(ExecutionAttributes executionAttributes) {
ChecksumSpecs resolvedChecksumSpecs = getResolvedChecksumSpecs(executionAttributes);
if (resolvedChecksumSpecs == null) {
return false;
} else {
return resolvedChecksumSpecs.algorithm() == null
&& resolvedChecksumSpecs.isRequestChecksumRequired();
}
}

public static ChecksumSpecs getResolvedChecksumSpecs(ExecutionAttributes executionAttributes) {
ChecksumSpecs checksumSpecs =
executionAttributes.getAttribute(SdkExecutionAttribute.RESOLVED_CHECKSUM_SPECS);
return checksumSpecs != null ? checksumSpecs : resolveChecksumSpecs(executionAttributes);
}

public static ChecksumSpecs resolveChecksumSpecs(ExecutionAttributes executionAttributes) {
HttpChecksum httpChecksumTraitInOperation =
executionAttributes.getAttribute(SdkInternalExecutionAttribute.HTTP_CHECKSUM);
if (httpChecksumTraitInOperation == null) {
return null;
} else {
boolean hasRequestValidation = httpChecksumTraitInOperation.requestValidationMode() != null;
String requestAlgorithm = httpChecksumTraitInOperation.requestAlgorithm();
String checksumHeaderName =
requestAlgorithm != null ? HttpChecksumUtils.httpChecksumHeader(requestAlgorithm) : null;
return ChecksumSpecs.builder()
.algorithmV2(DefaultChecksumAlgorithm.fromValue(requestAlgorithm))
.headerName(checksumHeaderName)
.responseValidationAlgorithmsV2(httpChecksumTraitInOperation.responseAlgorithmsV2())
.isValidationEnabled(hasRequestValidation)
.isRequestChecksumRequired(httpChecksumTraitInOperation.isRequestChecksumRequired())
.isRequestStreaming(httpChecksumTraitInOperation.isRequestStreaming())
.requestAlgorithmHeader(httpChecksumTraitInOperation.requestAlgorithmHeader())
.build();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1359,6 +1359,48 @@ execchain.MainClientExec (MainClientExec.java:execute(284)) - Connection can be

```


To log the output of the AWS SDK metrics, set the log
`org.apache.hadoop.fs.s3a.DefaultS3ClientFactory` to `TRACE`.
This will then turn on logging of the internal SDK metrics.4

These will actually be logged at INFO in the log
```
software.amazon.awssdk.metrics.LoggingMetricPublisher
```

```text
INFO metrics.LoggingMetricPublisher (LoggerAdapter.java:info(165)) - Metrics published:
MetricCollection(name=ApiCall, metrics=[
MetricRecord(metric=MarshallingDuration, value=PT0.000092041S),
MetricRecord(metric=RetryCount, value=0),
MetricRecord(metric=ApiCallSuccessful, value=true),
MetricRecord(metric=OperationName, value=DeleteObject),
MetricRecord(metric=EndpointResolveDuration, value=PT0.000132792S),
MetricRecord(metric=ApiCallDuration, value=PT0.064890875S),
MetricRecord(metric=CredentialsFetchDuration, value=PT0.000017458S),
MetricRecord(metric=ServiceEndpoint, value=https://buckets3.eu-west-2.amazonaws.com),
MetricRecord(metric=ServiceId, value=S3)], children=[
MetricCollection(name=ApiCallAttempt, metrics=[
MetricRecord(metric=TimeToFirstByte, value=PT0.06260225S),
MetricRecord(metric=SigningDuration, value=PT0.000293083S),
MetricRecord(metric=ReadThroughput, value=0.0),
MetricRecord(metric=ServiceCallDuration, value=PT0.06260225S),
MetricRecord(metric=HttpStatusCode, value=204),
MetricRecord(metric=BackoffDelayDuration, value=PT0S),
MetricRecord(metric=TimeToLastByte, value=PT0.064313667S),
MetricRecord(metric=AwsRequestId, value=RKZD44SE5DW91K1G)], children=[
MetricCollection(name=HttpClient, metrics=[
MetricRecord(metric=AvailableConcurrency, value=1),
MetricRecord(metric=LeasedConcurrency, value=0),
MetricRecord(metric=ConcurrencyAcquireDuration, value=PT0S),
MetricRecord(metric=PendingConcurrencyAcquires, value=0),
MetricRecord(metric=MaxConcurrency, value=512),
MetricRecord(metric=HttpClientName, value=Apache)], children=[])
])
])
```

### <a name="audit-logging"></a> Enable S3 Server-side Logging

The [Auditing](auditing) feature of the S3A connector can be used to generate
Expand Down
Loading