Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/gradle-integration-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ env:
S3_TEST_BUCKET : ${{ vars.S3_TEST_BUCKET }}
S3_TEST_PREFIX : ${{ vars.S3_TEST_PREFIX }}
ROLE_TO_ASSUME: ${{ secrets.S3_TEST_ASSUME_ROLE_ARN }}
CUSTOMER_KEY: ${{ secrets.CUSTOMER_KEY }}

jobs:
build:
Expand Down
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,21 @@ When the `S3SeekableInputStreamFactory` is no longer required to create new stre
s3SeekableInputStreamFactory.close();
```

### Accessing SSE_C encrypted objects

To access SSE_C encrypted objects using AAL, set the customer key which was used to encrypt the object in the ```OpenStreamInformation``` object and pass the openStreamInformation object in the stream. The customer key must be base64 encoded.

```
OpenStreamInformation openStreamInformation =
OpenStreamInformation.builder()
.encryptionSecrets(
EncryptionSecrets.builder().sseCustomerKey(Optional.of(base64EncodedCustomerKey)).build())
.build();

S3SeekableInputStream s3SeekableInputStream = s3SeekableInputStreamFactory.createStream(S3URI.of(bucket, key), openStreamInformation);

```

### Using with Hadoop

If you are using Analytics Accelerator Library for Amazon S3 with Hadoop, you need to set the stream type to `analytics` in the Hadoop configuration. An example configuration is as follows:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.s3.analyticsaccelerator.request;

import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Base64;
import java.util.Optional;
import lombok.Builder;
import lombok.Getter;

/**
* Contains encryption secrets for Server-Side Encryption with Customer-Provided Keys (SSE-C). This
* class manages the customer-provided encryption key used for SSE-C operations with Amazon S3.
*/
@Getter
public class EncryptionSecrets {

/**
* The customer-provided encryption key for SSE-C operations. When present, this key will be used
* for server-side encryption. The key must be Base64 encoded and exactly 256 bits (32 bytes) when
* decoded.
*/
private final Optional<String> ssecCustomerKey;

/**
* The Base64-encoded MD5 hash of the customer key. This hash is automatically calculated from the
* customer key and is used by Amazon S3 to verify the integrity of the encryption key during
* transmission. Will be null if no customer key is provided.
*/
private final String ssecCustomerKeyMd5;

/**
* Constructs an EncryptionSecrets instance with the specified SSE-C customer key.
*
* <p>This constructor processes the SSE-C (Server-Side Encryption with Customer-Provided Keys)
* encryption key and calculates its MD5 hash as required by Amazon S3. The process involves:
*
* <ol>
* <li>Accepting a Base64-encoded encryption key
* <li>Decoding the Base64 key back to bytes
* <li>Computing the MD5 hash of these bytes
* <li>Encoding the MD5 hash in Base64 format
* </ol>
*
* @param sseCustomerKey An Optional containing the Base64-encoded encryption key, or empty if no
* encryption is needed
*/
@Builder
public EncryptionSecrets(Optional<String> sseCustomerKey) {
this.ssecCustomerKey = sseCustomerKey;
this.ssecCustomerKeyMd5 =
sseCustomerKey
.map(
key -> {
try {
MessageDigest md = MessageDigest.getInstance("MD5");
return Base64.getEncoder()
.encodeToString(md.digest(Base64.getDecoder().decode(key)));
} catch (NoSuchAlgorithmException e) {
throw new IllegalStateException("MD5 algorithm not available", e);
}
})
.orElse(null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import lombok.AccessLevel;
import lombok.Builder;
import lombok.Getter;
import software.amazon.s3.analyticsaccelerator.request.EncryptionSecrets;
import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata;
import software.amazon.s3.analyticsaccelerator.request.StreamAuditContext;

Expand All @@ -41,6 +42,7 @@ public class OpenStreamInformation {
private final StreamAuditContext streamAuditContext;
private final ObjectMetadata objectMetadata;
private final InputPolicy inputPolicy;
private final EncryptionSecrets encryptionSecrets;

/** Default set of settings for {@link OpenStreamInformation} */
public static final OpenStreamInformation DEFAULT = OpenStreamInformation.builder().build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,25 @@

import static org.junit.jupiter.api.Assertions.*;

import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Optional;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import software.amazon.s3.analyticsaccelerator.request.EncryptionSecrets;
import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata;
import software.amazon.s3.analyticsaccelerator.request.StreamAuditContext;

public class OpenStreamInformationTest {

private static final String CUSTOMER_KEY = "32-bytes-long-key-for-testing-123";

/**
* To generate the base64 encoded md5 value for a customer key use the cli command echo -n
* "customer_key" | base64 | base64 -d | openssl md5 -binary | base64
*/
private static final String EXPECTED_BASE64_MD5 = "R+k8pqEVUmkxDfaH5MqIdw==";

@Test
public void testDefaultInstance() {
OpenStreamInformation info = OpenStreamInformation.DEFAULT;
Expand All @@ -32,24 +44,36 @@ public void testDefaultInstance() {
assertNull(info.getStreamAuditContext(), "Default streamContext should be null");
assertNull(info.getObjectMetadata(), "Default objectMetadata should be null");
assertNull(info.getInputPolicy(), "Default inputPolicy should be null");
assertNull(info.getEncryptionSecrets(), "Default encryptionSecrets should be null");
}

@Test
public void testBuilderWithAllFields() {
StreamAuditContext mockContext = Mockito.mock(StreamAuditContext.class);
ObjectMetadata mockMetadata = Mockito.mock(ObjectMetadata.class);
InputPolicy mockPolicy = Mockito.mock(InputPolicy.class);
String base64Key =
Base64.getEncoder().encodeToString(CUSTOMER_KEY.getBytes(StandardCharsets.UTF_8));
EncryptionSecrets secrets =
EncryptionSecrets.builder().sseCustomerKey(Optional.of(base64Key)).build();

OpenStreamInformation info =
OpenStreamInformation.builder()
.streamAuditContext(mockContext)
.objectMetadata(mockMetadata)
.inputPolicy(mockPolicy)
.encryptionSecrets(secrets)
.build();

assertSame(mockContext, info.getStreamAuditContext(), "StreamContext should match");
assertSame(mockMetadata, info.getObjectMetadata(), "ObjectMetadata should match");
assertSame(mockPolicy, info.getInputPolicy(), "InputPolicy should match");
assertEquals(
base64Key,
info.getEncryptionSecrets().getSsecCustomerKey().get(),
"Customer key should match");
assertNotNull(info.getEncryptionSecrets().getSsecCustomerKeyMd5(), "MD5 should not be null");
assertEquals(EXPECTED_BASE64_MD5, info.getEncryptionSecrets().getSsecCustomerKeyMd5());
}

@Test
Expand Down Expand Up @@ -103,4 +127,47 @@ public void testNullFields() {
assertNull(info.getObjectMetadata(), "ObjectMetadata should be null");
assertNull(info.getInputPolicy(), "InputPolicy should be null");
}

@Test
public void testDefaultInstanceEncryptionSecrets() {
OpenStreamInformation info = OpenStreamInformation.DEFAULT;
assertNull(info.getEncryptionSecrets(), "Default encryptionSecrets should be null");
}

@Test
public void testBuilderWithEncryptionSecrets() {
// Create a sample base64 encoded key
String base64Key =
Base64.getEncoder().encodeToString(CUSTOMER_KEY.getBytes(StandardCharsets.UTF_8));
EncryptionSecrets secrets =
EncryptionSecrets.builder().sseCustomerKey(Optional.of(base64Key)).build();

OpenStreamInformation info = OpenStreamInformation.builder().encryptionSecrets(secrets).build();

assertNotNull(info.getEncryptionSecrets(), "EncryptionSecrets should not be null");
assertTrue(
info.getEncryptionSecrets().getSsecCustomerKey().isPresent(),
"Customer key should be present");
assertEquals(
base64Key,
info.getEncryptionSecrets().getSsecCustomerKey().get(),
"Customer key should match");
assertNotNull(info.getEncryptionSecrets().getSsecCustomerKeyMd5(), "MD5 should not be null");
assertEquals(EXPECTED_BASE64_MD5, info.getEncryptionSecrets().getSsecCustomerKeyMd5());
}

@Test
public void testBuilderWithEmptyEncryptionSecrets() {
EncryptionSecrets secrets =
EncryptionSecrets.builder().sseCustomerKey(Optional.empty()).build();

OpenStreamInformation info = OpenStreamInformation.builder().encryptionSecrets(secrets).build();

assertNotNull(info.getEncryptionSecrets(), "EncryptionSecrets should not be null");
assertFalse(
info.getEncryptionSecrets().getSsecCustomerKey().isPresent(),
"Customer key should be empty");
assertNull(
info.getEncryptionSecrets().getSsecCustomerKeyMd5(), "MD5 should be null for empty key");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream;
import software.amazon.s3.analyticsaccelerator.common.ObjectRange;
import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation;
import software.amazon.s3.analyticsaccelerator.util.S3URI;

/** Base class for the integration tests */
Expand Down Expand Up @@ -99,7 +100,11 @@ protected void testAndCompareStreamReadPattern(
// Read using the standard S3 async client
Crc32CChecksum directChecksum = new Crc32CChecksum();
executeReadPatternDirectly(
s3ClientKind, s3Object, streamReadPattern, Optional.of(directChecksum));
s3ClientKind,
s3Object,
streamReadPattern,
Optional.of(directChecksum),
OpenStreamInformation.DEFAULT);

// Read using the AAL S3
Crc32CChecksum aalChecksum = new Crc32CChecksum();
Expand All @@ -108,7 +113,8 @@ protected void testAndCompareStreamReadPattern(
s3Object,
streamReadPattern,
AALInputStreamConfigurationKind,
Optional.of(aalChecksum));
Optional.of(aalChecksum),
OpenStreamInformation.DEFAULT);

// Assert checksums
assertChecksums(directChecksum, aalChecksum);
Expand Down Expand Up @@ -140,7 +146,8 @@ protected void testChangingEtagMidStream(
S3URI s3URI =
s3Object.getObjectUri(this.getS3ExecutionContext().getConfiguration().getBaseUri());
S3AsyncClient s3Client = this.getS3ExecutionContext().getS3Client();
S3SeekableInputStream stream = s3AALClientStreamReader.createReadStream(s3Object);
S3SeekableInputStream stream =
s3AALClientStreamReader.createReadStream(s3Object, OpenStreamInformation.DEFAULT);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could've saved yourself lots of time keeping createReadStream(s3Object) method around and inside call createReadStream(s3Object, OpenStreamInformation.DEFAULT).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a nit btw but probably a better way to implement this pattern.


// Read first 100 bytes
readAndAssert(stream, buffer, 0, 100);
Expand Down Expand Up @@ -171,7 +178,11 @@ protected void testChangingEtagMidStream(
assertDoesNotThrow(
() ->
executeReadPatternOnAAL(
s3Object, s3AALClientStreamReader, streamReadPattern, Optional.of(datChecksum)));
s3Object,
s3AALClientStreamReader,
streamReadPattern,
Optional.of(datChecksum),
OpenStreamInformation.DEFAULT));
assert (datChecksum.getChecksumBytes().length > 0);
}
}
Expand Down Expand Up @@ -199,7 +210,7 @@ protected void testReadVectored(
this.createS3AALClientStreamReader(s3ClientKind, AALInputStreamConfigurationKind)) {

S3SeekableInputStream s3SeekableInputStream =
s3AALClientStreamReader.createReadStream(s3Object);
s3AALClientStreamReader.createReadStream(s3Object, OpenStreamInformation.DEFAULT);

List<ObjectRange> objectRanges = new ArrayList<>();
objectRanges.add(new ObjectRange(new CompletableFuture<>(), 50, 500));
Expand All @@ -217,7 +228,7 @@ protected void testReadVectored(
ByteBuffer byteBuffer = objectRange.getByteBuffer().join();

S3SeekableInputStream verificationStream =
s3AALClientStreamReader.createReadStream(s3Object);
s3AALClientStreamReader.createReadStream(s3Object, OpenStreamInformation.DEFAULT);
verificationStream.seek(objectRange.getOffset());
byte[] buffer = new byte[objectRange.getLength()];
int readBytes = verificationStream.read(buffer, 0, buffer.length);
Expand Down Expand Up @@ -273,7 +284,8 @@ protected void testChangingEtagAfterStreamPassesAndReturnsCachedObject(
// Create the s3DATClientStreamReader - that creates the shared state
try (S3AALClientStreamReader s3AALClientStreamReader =
this.createS3AALClientStreamReader(s3ClientKind, AALInputStreamConfigurationKind)) {
S3SeekableInputStream stream = s3AALClientStreamReader.createReadStream(s3Object);
S3SeekableInputStream stream =
s3AALClientStreamReader.createReadStream(s3Object, OpenStreamInformation.DEFAULT);
Crc32CChecksum datChecksum = calculateCRC32C(stream, bufferSize);

S3URI s3URI =
Expand All @@ -287,7 +299,8 @@ protected void testChangingEtagAfterStreamPassesAndReturnsCachedObject(
AsyncRequestBody.fromBytes(generateRandomBytes(bufferSize)))
.join();

S3SeekableInputStream cacheStream = s3AALClientStreamReader.createReadStream(s3Object);
S3SeekableInputStream cacheStream =
s3AALClientStreamReader.createReadStream(s3Object, OpenStreamInformation.DEFAULT);
Crc32CChecksum cachedChecksum = calculateCRC32C(cacheStream, bufferSize);

// Assert checksums
Expand Down Expand Up @@ -351,7 +364,11 @@ protected void testAALReadConcurrency(
// Read using the standard S3 async client. We do this once, to calculate the checksums
Crc32CChecksum directChecksum = new Crc32CChecksum();
executeReadPatternDirectly(
s3ClientKind, s3Object, streamReadPattern, Optional.of(directChecksum));
s3ClientKind,
s3Object,
streamReadPattern,
Optional.of(directChecksum),
OpenStreamInformation.DEFAULT);

// Create the s3DATClientStreamReader - that creates the shared state
try (S3AALClientStreamReader s3AALClientStreamReader =
Expand All @@ -374,7 +391,8 @@ protected void testAALReadConcurrency(
s3Object,
s3AALClientStreamReader,
streamReadPattern,
Optional.of(datChecksum));
Optional.of(datChecksum),
OpenStreamInformation.DEFAULT);

// Assert checksums
assertChecksums(directChecksum, datChecksum);
Expand Down Expand Up @@ -418,7 +436,8 @@ protected void testSmallObjectPrefetching(
this.createS3AALClientStreamReader(s3ClientKind, AALInputStreamConfigurationKind)) {

// First stream
S3SeekableInputStream stream = s3AALClientStreamReader.createReadStream(s3Object);
S3SeekableInputStream stream =
s3AALClientStreamReader.createReadStream(s3Object, OpenStreamInformation.DEFAULT);
Crc32CChecksum firstChecksum = calculateCRC32C(stream, (int) s3Object.getSize());

S3URI s3URI =
Expand All @@ -433,7 +452,8 @@ protected void testSmallObjectPrefetching(
.join();

// Create second stream
S3SeekableInputStream secondStream = s3AALClientStreamReader.createReadStream(s3Object);
S3SeekableInputStream secondStream =
s3AALClientStreamReader.createReadStream(s3Object, OpenStreamInformation.DEFAULT);
Crc32CChecksum secondChecksum = calculateCRC32C(secondStream, (int) s3Object.getSize());

if (s3Object.getSize() < 8 * ONE_MB) {
Expand Down
Loading
Loading