Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2489,7 +2489,7 @@ UploadResult executePut(PutObjectRequest putObjectRequest,
* Wait for an upload to complete.
* If the waiting for completion is interrupted, the upload will be
* aborted before an {@code InterruptedIOException} is thrown.
* @param upload upload to wait for
* @param uploadInfo upload to wait for
* @param key destination key
* @return the upload result
* @throws InterruptedIOException if the blocking was interrupted.
Expand Down Expand Up @@ -2592,7 +2592,7 @@ private void copyFile(String srcKey, String dstKey, long size)
setOptionalObjectMetadata(dstom);
CopyObjectRequest copyObjectRequest =
new CopyObjectRequest(bucket, srcKey, bucket, dstKey);
setOptionalCopyObjectRequestParameters(copyObjectRequest);
setOptionalCopyObjectRequestParameters(srcom, copyObjectRequest);
copyObjectRequest.setCannedAccessControlList(cannedACL);
copyObjectRequest.setNewObjectMetadata(dstom);
Copy copy = transfers.copy(copyObjectRequest);
Expand All @@ -2608,6 +2608,49 @@ private void copyFile(String srcKey, String dstKey, long size)
});
}

/**
* Propagate encryption parameters from source file if set else use the
* current filesystem encryption settings.
* @param srcom source object meta.
* @param copyObjectRequest copy object request body.
*/
private void setOptionalCopyObjectRequestParameters(
ObjectMetadata srcom,
CopyObjectRequest copyObjectRequest) {
String sourceKMSId = srcom.getSSEAwsKmsKeyId();
if (isNotEmpty(sourceKMSId)) {
// source KMS ID is propagated
LOG.debug("Propagating SSE-KMS settings from source {}",
sourceKMSId);
copyObjectRequest.setSSEAwsKeyManagementParams(
new SSEAwsKeyManagementParams(sourceKMSId));
}
switch(getServerSideEncryptionAlgorithm()) {
/**
* Overriding with client encryption settings.
*/
case SSE_C:
if (isNotBlank(getServerSideEncryptionKey(bucket, getConf()))) {
//at the moment, only supports copy using the same key
SSECustomerKey customerKey = generateSSECustomerKey();
copyObjectRequest.setSourceSSECustomerKey(customerKey);
copyObjectRequest.setDestinationSSECustomerKey(customerKey);
}
break;
case SSE_KMS:
copyObjectRequest.setSSEAwsKeyManagementParams(
generateSSEAwsKeyParams()
);
break;
default:
}
}

/**
* Set the optional parameters when initiating the request (encryption,
* headers, storage, etc).
* @param req request to patch.
*/
protected void setOptionalMultipartUploadRequestParameters(
InitiateMultipartUploadRequest req) {
switch (serverSideEncryptionAlgorithm) {
Expand Down Expand Up @@ -2657,26 +2700,6 @@ InitiateMultipartUploadResult initiateMultipartUpload(
return getAmazonS3Client().initiateMultipartUpload(request);
}

protected void setOptionalCopyObjectRequestParameters(
CopyObjectRequest copyObjectRequest) throws IOException {
switch (serverSideEncryptionAlgorithm) {
case SSE_KMS:
copyObjectRequest.setSSEAwsKeyManagementParams(
generateSSEAwsKeyParams()
);
break;
case SSE_C:
if (isNotBlank(getServerSideEncryptionKey(bucket, getConf()))) {
//at the moment, only supports copy using the same key
SSECustomerKey customerKey = generateSSECustomerKey();
copyObjectRequest.setSourceSSECustomerKey(customerKey);
copyObjectRequest.setDestinationSSECustomerKey(customerKey);
}
break;
default:
}
}

private void setOptionalPutRequestParameters(PutObjectRequest request) {
switch (serverSideEncryptionAlgorithm) {
case SSE_KMS:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
*/
public abstract class AbstractS3ATestBase extends AbstractFSContractTestBase
implements S3ATestConstants {

protected static final Logger LOG =
LoggerFactory.getLogger(AbstractS3ATestBase.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,19 @@
import com.amazonaws.services.s3.model.ObjectMetadata;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.net.util.Base64;
import java.io.IOException;

import org.junit.Test;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.junit.Test;

import java.io.IOException;

import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM;
import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionTestsDisabled;

/**
* Test whether or not encryption works by turning it on. Some checks
Expand All @@ -42,11 +46,24 @@ public abstract class AbstractTestS3AEncryption extends AbstractS3ATestBase {
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
S3ATestUtils.disableFilesystemCaching(conf);
conf.set(Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM,
getSSEAlgorithm().getMethod());
patchConfigurationEncryptionSettings(conf);
return conf;
}

/**
* This removes the encryption settings from the
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not "removes the encryption settings from the configuration". Could you explain why that is not required in branch-3.2 and/or update the java doc here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes comment is misleading. I had to fix the merge conflict and as removeBaseAndBucketOverrides() was not present earlier I had to delete that line thinking it is not required.

Although I can see the same method is present in S3ATestUtils but with different parameters and it is not used in this test.
@steveloughran Do you also think tests might break here if bucket level conf are set?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it may well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried setting the bucket level conf for fs.s3a.bucket.mthakur-data.server-side-encryption.key and some tests for eg. ITestS3AEncryptionSSES3 started failing. I even tried adding the removeBaseAndBucketOverrides call but it doesn't help.

I tried setting then same bucket level conf in apache/trunk as well and the tests are succeeding there.

The reason for test failure is
java.io.IOException: AES256 is enabled but an encryption key was set in fs.s3a.server-side-encryption.key (key of length 76 ending with 8) at org.apache.hadoop.fs.s3a.S3AUtils.getEncryptionAlgorithm(S3AUtils.java:1440) at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:316) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3298) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:476)

removeBaseAndBucketOverrides() call essentially removes this parameter not sure how it is getting added in FileSystem initialisation flow again. It is too hard to compare and debug the difference between branch-3.2 and trunk as there are many changes. Any pointers in this regards would be greatly appreciated.

Also could someone please trigger the test based on their configs and share the results. Thanks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can reproduce the test failure. I ran the test with fs.s3a.bucket.BUCKETNAME.server-side-encryption.key set in auth-keys.xml.

I set the S3 bucket default encryption policy as SSE-KMS, and that is not changing the ITestS3AEncryptionSSES3 test failure (as expected). Well, test ITestS3AEncryptionWithDefaultS3Settings could pass then (again, as expected).

Copy link
Member

@liuml07 liuml07 Mar 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I did some investigation, and the observation is as following:

  1. The failing tests are just about tests. The fix in copy options should work as expected. Specially, some encryption tests fail only when bucket level encryption is set.
  2. The inconsistent encryption algorithm and key situation in conf is partially because of the addConfResource(CONTRACT_XML); function after creating and patching the conf object, see here.
  3. The call to addConfResource(CONTRACT_XML); should be considered a tricky unrelated bug, not brought by this patch. Most likely it is fixed by HADOOP-16626 in trunk. Guess that's why we don't see test failures there. I'm wondering how trunk is not failing this...? Was HADOOP-16626 enough?
  4. Specially, there are some discussions about the Configuration loading resources "after" the conf object has been patched when setting up the S3 tests. In our test case, if you have the change to call removeBucketOverrides(), and you have the change in S3AContract not calling addConfResource(CONTRACT_XML);, then you can test with this: you add multiple testEncryption() and multiple testEncryptionOverRename() test cases in AbstractTestS3AEncryption (example). Guess what, only one test fail and all others are failing...How interesting? This seems very related to static resource loading when creating S3AFileSystem...

I'll stop here since I for now have no more context. I will request @steveloughran to provide more help. Shall we backport HADOOP-16626 to branch-3.2 and see?

* configuration and then sets the
* fs.s3a.server-side-encryption-algorithm value to
* be that of {@code getSSEAlgorithm()}.
* Called in {@code createConfiguration()}.
* @param conf configuration to patch.
*/
protected void patchConfigurationEncryptionSettings(
final Configuration conf) {
conf.set(SERVER_SIDE_ENCRYPTION_ALGORITHM,
getSSEAlgorithm().getMethod());
}

private static final int[] SIZES = {
0, 1, 2, 3, 4, 5, 254, 255, 256, 257, 2 ^ 12 - 1
};
Expand All @@ -66,10 +83,15 @@ public void testEncryptionOverRename() throws Throwable {
S3AFileSystem fs = getFileSystem();
writeDataset(fs, src, data, data.length, 1024 * 1024, true);
ContractTestUtils.verifyFileContents(fs, src, data);
Path dest = path(src.getName() + "-copy");
fs.rename(src, dest);
ContractTestUtils.verifyFileContents(fs, dest, data);
assertEncrypted(dest);
// this file will be encrypted
assertEncrypted(src);

Path targetDir = path("target");
mkdirs(targetDir);
fs.rename(src, targetDir);
Path renamedFile = new Path(targetDir, src.getName());
ContractTestUtils.verifyFileContents(fs, renamedFile, data);
assertEncrypted(renamedFile);
}

protected void validateEncryptionForFilesize(int len) throws IOException {
Expand All @@ -95,40 +117,14 @@ protected String createFilename(String name) {
* @throws IOException on a failure
*/
protected void assertEncrypted(Path path) throws IOException {
ObjectMetadata md = getFileSystem().getObjectMetadata(path);
switch(getSSEAlgorithm()) {
case SSE_C:
assertEquals("AES256", md.getSSECustomerAlgorithm());
String md5Key = convertKeyToMd5();
assertEquals(md5Key, md.getSSECustomerKeyMd5());
break;
case SSE_KMS:
assertEquals("aws:kms", md.getSSEAlgorithm());
//S3 will return full arn of the key, so specify global arn in properties
assertEquals(this.getConfiguration().
getTrimmed(Constants.SERVER_SIDE_ENCRYPTION_KEY),
md.getSSEAwsKmsKeyId());
break;
default:
assertEquals("AES256", md.getSSEAlgorithm());
}
}

/**
* Decodes the SERVER_SIDE_ENCRYPTION_KEY from base64 into an AES key, then
* gets the md5 of it, then encodes it in base64 so it will match the version
* that AWS returns to us.
*
* @return md5'd base64 encoded representation of the server side encryption
* key
*/
private String convertKeyToMd5() {
String base64Key = getConfiguration().getTrimmed(
Constants.SERVER_SIDE_ENCRYPTION_KEY
);
byte[] key = Base64.decodeBase64(base64Key);
byte[] md5 = DigestUtils.md5(key);
return Base64.encodeBase64String(md5).trim();
//S3 will return full arn of the key, so specify global arn in properties
String kmsKeyArn = this.getConfiguration().
getTrimmed(SERVER_SIDE_ENCRYPTION_KEY);
S3AEncryptionMethods algorithm = getSSEAlgorithm();
EncryptionTestUtils.assertEncrypted(getFileSystem(),
path,
algorithm,
kmsKeyArn);
}

protected abstract S3AEncryptionMethods getSSEAlgorithm();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.s3a;

import java.io.IOException;

import com.amazonaws.services.s3.model.ObjectMetadata;

import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.net.util.Base64;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;

public final class EncryptionTestUtils {

/** Private constructor */
private EncryptionTestUtils() {
}

public static final String AWS_KMS_SSE_ALGORITHM = "aws:kms";

public static final String SSE_C_ALGORITHM = "AES256";

/**
* Decodes the SERVER_SIDE_ENCRYPTION_KEY from base64 into an AES key, then
* gets the md5 of it, then encodes it in base64 so it will match the version
* that AWS returns to us.
*
* @return md5'd base64 encoded representation of the server side encryption
* key
*/
public static String convertKeyToMd5(FileSystem fs) {
String base64Key = fs.getConf().getTrimmed(
SERVER_SIDE_ENCRYPTION_KEY
);
byte[] key = Base64.decodeBase64(base64Key);
byte[] md5 = DigestUtils.md5(key);
return Base64.encodeBase64String(md5).trim();
}

/**
* Assert that a path is encrypted with right encryption settings.
* @param path file path.
* @param algorithm encryption algorithm.
* @param kmsKeyArn full kms key.
*/
public static void assertEncrypted(S3AFileSystem fs,
final Path path,
final S3AEncryptionMethods algorithm,
final String kmsKeyArn)
throws IOException {
ObjectMetadata md = fs.getObjectMetadata(path);
String details = String.format(
"file %s with encryption algorithm %s and key %s",
path,
md.getSSEAlgorithm(),
md.getSSEAwsKmsKeyId());
switch(algorithm) {
case SSE_C:
assertNull("Metadata algorithm should have been null in "
+ details,
md.getSSEAlgorithm());
assertEquals("Wrong SSE-C algorithm in "
+ details,
SSE_C_ALGORITHM, md.getSSECustomerAlgorithm());
String md5Key = convertKeyToMd5(fs);
assertEquals("getSSECustomerKeyMd5() wrong in " + details,
md5Key, md.getSSECustomerKeyMd5());
break;
case SSE_KMS:
assertEquals("Wrong algorithm in " + details,
AWS_KMS_SSE_ALGORITHM, md.getSSEAlgorithm());
assertEquals("Wrong KMS key in " + details,
kmsKeyArn,
md.getSSEAwsKmsKeyId());
break;
default:
assertEquals("AES256", md.getSSEAlgorithm());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ protected S3AEncryptionMethods getSSEAlgorithm() {
@Override
protected void assertEncrypted(Path path) throws IOException {
ObjectMetadata md = getFileSystem().getObjectMetadata(path);
assertEquals("aws:kms", md.getSSEAlgorithm());
assertEquals("SSE Algorithm", EncryptionTestUtils.AWS_KMS_SSE_ALGORITHM,
md.getSSEAlgorithm());
assertThat(md.getSSEAwsKmsKeyId(), containsString("arn:aws:kms:"));
}
}
Loading