Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1568,4 +1568,19 @@ private Constants() {
* is true: {@value}.
*/
public static final String HTTP_SIGNER_CLASS_NAME = "fs.s3a.http.signer.class";

/**
* Should checksums be validated on download?
* This is slower and not needed on TLS connections.
* Value: {@value}.
*/
public static final String CHECKSUM_VALIDATION =
"fs.s3a.checksum.validation";

/**
* Default value of {@link #CHECKSUM_VALIDATION}.
* Value: {@value}.
*/
public static final boolean CHECKSUM_VALIDATION_DEFAULT = false;

}
Original file line number Diff line number Diff line change
Expand Up @@ -179,11 +179,15 @@ private <BuilderT extends S3BaseClientBuilder<BuilderT, ClientT>, ClientT> Build
configureEndpointAndRegion(builder, parameters, conf);

S3Configuration serviceConfiguration = S3Configuration.builder()
.pathStyleAccessEnabled(parameters.isPathStyleAccess())
.build();
.pathStyleAccessEnabled(parameters.isPathStyleAccess())
.checksumValidationEnabled(parameters.isChecksumValidationEnabled())
.build();

final ClientOverrideConfiguration.Builder override =
createClientOverrideConfiguration(parameters, conf);

S3BaseClientBuilder s3BaseClientBuilder = builder
.overrideConfiguration(createClientOverrideConfiguration(parameters, conf))
.overrideConfiguration(override.build())
.credentialsProvider(parameters.getCredentialSet())
.disableS3ExpressSessionAuth(!parameters.isExpressCreateSession())
.serviceConfiguration(serviceConfiguration);
Expand All @@ -204,8 +208,9 @@ private <BuilderT extends S3BaseClientBuilder<BuilderT, ClientT>, ClientT> Build
* @throws IOException any IOE raised, or translated exception
* @throws RuntimeException some failures creating an http signer
* @return the override configuration
* @throws IOException any IOE raised, or translated exception
*/
protected ClientOverrideConfiguration createClientOverrideConfiguration(
protected ClientOverrideConfiguration.Builder createClientOverrideConfiguration(
S3ClientCreationParameters parameters, Configuration conf) throws IOException {
final ClientOverrideConfiguration.Builder clientOverrideConfigBuilder =
AWSClientConfig.createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_S3);
Expand Down Expand Up @@ -237,7 +242,7 @@ protected ClientOverrideConfiguration createClientOverrideConfiguration(
final RetryPolicy.Builder retryPolicyBuilder = AWSClientConfig.createRetryPolicyBuilder(conf);
clientOverrideConfigBuilder.retryPolicy(retryPolicyBuilder.build());

return clientOverrideConfigBuilder.build();
return clientOverrideConfigBuilder;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1055,7 +1055,9 @@ private void bindAWSClient(URI name, boolean dtEnabled) throws IOException {
.withRegion(configuredRegion)
.withFipsEnabled(fipsEnabled)
.withExpressCreateSession(
conf.getBoolean(S3EXPRESS_CREATE_SESSION, S3EXPRESS_CREATE_SESSION_DEFAULT));
conf.getBoolean(S3EXPRESS_CREATE_SESSION, S3EXPRESS_CREATE_SESSION_DEFAULT))
.withChecksumValidationEnabled(
conf.getBoolean(CHECKSUM_VALIDATION, CHECKSUM_VALIDATION_DEFAULT));

S3ClientFactory clientFactory = ReflectionUtils.newInstance(s3ClientFactoryClass, conf);
s3Client = clientFactory.createS3Client(getUri(), parameters);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1304,6 +1304,17 @@ public IOStatistics getIOStatistics() {
return ioStatistics;
}

/**
* Get the wrapped stream.
* This is for testing only.
*
* @return the wrapped stream, or null if there is none.
*/
@VisibleForTesting
public ResponseInputStream<GetObjectResponse> getWrappedStream() {
return wrappedStream;
}

/**
* Callbacks for input stream IO.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,11 @@ final class S3ClientCreationParameters {
*/
private boolean expressCreateSession = S3EXPRESS_CREATE_SESSION_DEFAULT;

/**
* Enable checksum validation.
*/
private boolean checksumValidationEnabled;

/**
* Is FIPS enabled?
*/
Expand Down Expand Up @@ -451,6 +456,20 @@ public S3ClientCreationParameters withExpressCreateSession(final boolean value)
return this;
}

/**
* Set builder value.
* @param value new value
* @return the builder
*/
public S3ClientCreationParameters withChecksumValidationEnabled(final boolean value) {
checksumValidationEnabled = value;
return this;
}

public boolean isChecksumValidationEnabled() {
return checksumValidationEnabled;
}

@Override
public String toString() {
return "S3ClientCreationParameters{" +
Expand All @@ -464,6 +483,7 @@ public String toString() {
", multipartCopy=" + multipartCopy +
", region='" + region + '\'' +
", expressCreateSession=" + expressCreateSession +
", checksumValidationEnabled=" + checksumValidationEnabled +
'}';
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,19 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.core.internal.io.ChecksumValidatingInputStream;
import software.amazon.awssdk.services.s3.internal.checksums.S3ChecksumValidatingInputStream;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;

import java.io.Closeable;
import java.io.File;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.lang.reflect.Field;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -1663,6 +1669,54 @@ public static S3AInputStream getS3AInputStream(
}
}

/**
* Get the inner stream of a FilterInputStream.
* Uses reflection to access a protected field.
* @param fis input stream.
* @return the inner stream.
*/
public static InputStream getInnerStream(FilterInputStream fis) {
try {
final Field field = FilterInputStream.class.getDeclaredField("in");
field.setAccessible(true);
return (InputStream) field.get(fis);
} catch (IllegalAccessException | NoSuchFieldException e) {
throw new AssertionError("Failed to get inner stream: " + e, e);
}
}

/**
* Get the innermost stream of a chain of FilterInputStreams.
* This allows tests into the internals of an AWS SDK stream chain.
* @param fis input stream.
* @return the inner stream.
*/
public static InputStream getInnermostStream(FilterInputStream fis) {
InputStream inner = fis;
while (inner instanceof FilterInputStream) {
inner = getInnerStream((FilterInputStream) inner);
}
return inner;
}

/**
* Verify that an s3a stream is not checksummed.
* The inner stream must be active.
*/
public static void assertStreamIsNotChecksummed(final S3AInputStream wrappedS3A) {
final ResponseInputStream<GetObjectResponse> wrappedStream =
wrappedS3A.getWrappedStream();
Assertions.assertThat(wrappedStream)
.describedAs("wrapped stream is not open: call read() on %s", wrappedS3A)
.isNotNull();

final InputStream inner = getInnermostStream(wrappedStream);
Assertions.assertThat(inner)
.describedAs("innermost stream of %s", wrappedS3A)
.isNotInstanceOf(ChecksumValidatingInputStream.class)
.isNotInstanceOf(S3ChecksumValidatingInputStream.class);
}

/**
* Disable Prefetching streams from S3AFileSystem in tests.
* @param conf Configuration to remove the prefetch property from.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@


import java.io.EOFException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
Expand All @@ -29,6 +30,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileRange;
import org.apache.hadoop.fs.FileStatus;
Expand All @@ -45,8 +47,15 @@
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
import static org.apache.hadoop.fs.contract.ContractTestUtils.readStream;
import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile;
import static org.apache.hadoop.fs.s3a.Constants.CHECKSUM_VALIDATION;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.assertStreamIsNotChecksummed;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getS3AInputStream;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BYTES_READ_CLOSE;
import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_OPENED;
import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_SEEK_BYTES_SKIPPED;
Expand Down Expand Up @@ -79,6 +88,16 @@ public ITestS3AOpenCost() {
super(true);
}

@Override
public Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
removeBaseAndBucketOverrides(conf,
CHECKSUM_VALIDATION);
conf.setBoolean(CHECKSUM_VALIDATION, false);
disableFilesystemCaching(conf);
return conf;
}

/**
* Setup creates a test file, saves is status and length
* to fields.
Expand Down Expand Up @@ -139,6 +158,34 @@ public void testOpenFileWithStatusOfOtherFS() throws Throwable {
assertEquals("bytes read from file", fileLength, readLen);
}

@Test
public void testStreamIsNotChecksummed() throws Throwable {
describe("Verify that an opened stream is not checksummed");
S3AFileSystem fs = getFileSystem();
// open the file
try (FSDataInputStream in = verifyMetrics(() ->
fs.openFile(testFile)
.must(FS_OPTION_OPENFILE_READ_POLICY,
FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE)
.mustLong(FS_OPTION_OPENFILE_LENGTH, fileLength)
.build()
.get(),
always(NO_HEAD_OR_LIST),
with(STREAM_READ_OPENED, 0))) {

// if prefetching is enabled, skip this test
final InputStream wrapped = in.getWrappedStream();
if (!(wrapped instanceof S3AInputStream)) {
skip("Not an S3AInputStream: " + wrapped);
}

// open the stream.
in.read();
// now examine the innermost stream and make sure it doesn't have a checksum
assertStreamIsNotChecksummed(getS3AInputStream(in));
}
}

@Test
public void testOpenFileShorterLength() throws Throwable {
// do a second read with the length declared as short.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.s3a.Constants.ASYNC_DRAIN_THRESHOLD;
import static org.apache.hadoop.fs.s3a.Constants.CHECKSUM_VALIDATION;
import static org.apache.hadoop.fs.s3a.Constants.ESTABLISH_TIMEOUT;
import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE;
import static org.apache.hadoop.fs.s3a.Constants.MAXIMUM_CONNECTIONS;
Expand Down Expand Up @@ -84,6 +85,11 @@ public class ITestUnbufferDraining extends AbstractS3ACostTest {
*/
public static final int ATTEMPTS = 10;

/**
* Should checksums be enabled?
*/
public static final boolean CHECKSUMS = false;

/**
* Test FS with a tiny connection pool and
* no recovery.
Expand All @@ -102,6 +108,7 @@ public Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
removeBaseAndBucketOverrides(conf,
ASYNC_DRAIN_THRESHOLD,
CHECKSUM_VALIDATION,
ESTABLISH_TIMEOUT,
INPUT_FADVISE,
MAX_ERROR_RETRIES,
Expand All @@ -111,7 +118,7 @@ public Configuration createConfiguration() {
REQUEST_TIMEOUT,
RETRY_LIMIT,
SOCKET_TIMEOUT);

conf.setBoolean(CHECKSUM_VALIDATION, CHECKSUMS);
return conf;
}

Expand All @@ -132,6 +139,7 @@ public void setup() throws Exception {
conf.setInt(MAX_ERROR_RETRIES, 1);
conf.setInt(READAHEAD_RANGE, READAHEAD);
conf.setInt(RETRY_LIMIT, 1);
conf.setBoolean(CHECKSUM_VALIDATION, CHECKSUMS);
setDurationAsSeconds(conf, ESTABLISH_TIMEOUT,
Duration.ofSeconds(1));

Expand Down Expand Up @@ -221,12 +229,22 @@ private static long lookupCounter(
*/
private static void assertReadPolicy(final FSDataInputStream in,
final S3AInputPolicy policy) {
S3AInputStream inner = (S3AInputStream) in.getWrappedStream();
S3AInputStream inner = getS3AInputStream(in);
Assertions.assertThat(inner.getInputPolicy())
.describedAs("input policy of %s", inner)
.isEqualTo(policy);
}

/**
* Extract the inner stream from an FSDataInputStream.
* Because prefetching is disabled, this is always an S3AInputStream.
* @param in input stream
* @return the inner stream cast to an S3AInputStream.
*/
private static S3AInputStream getS3AInputStream(final FSDataInputStream in) {
return (S3AInputStream) in.getWrappedStream();
}

/**
* Test stream close performance/behavior with unbuffer
* aborting rather than draining.
Expand Down