diff --git a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIOIntegration.java b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIOIntegration.java index 5fe410582ce5..e51ece76a3e3 100644 --- a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIOIntegration.java +++ b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIOIntegration.java @@ -19,6 +19,7 @@ package org.apache.iceberg.aws.s3; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.io.ByteArrayInputStream; import java.io.InputStream; @@ -66,6 +67,7 @@ import software.amazon.awssdk.services.s3.model.Permission; import software.amazon.awssdk.services.s3.model.PutBucketVersioningRequest; import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.services.s3.model.S3Exception; import software.amazon.awssdk.services.s3.model.ServerSideEncryption; import software.amazon.awssdk.services.s3.model.VersioningConfiguration; import software.amazon.awssdk.services.s3control.S3ControlClient; @@ -510,6 +512,85 @@ public void testServerSideCustomEncryption() throws Exception { assertThat(response.sseCustomerKeyMD5()).isEqualTo(md5); } + private S3FileIO getS3FileIOInstanceWithAnalyticsAcceleratorAndServerSideCustomEncryptionEnabled() + throws Exception { + // generate key + KeyGenerator keyGenerator = KeyGenerator.getInstance("AES"); + keyGenerator.init(256, new SecureRandom()); + SecretKey secretKey = keyGenerator.generateKey(); + Base64.Encoder encoder = Base64.getEncoder(); + String encodedKey = new String(encoder.encode(secretKey.getEncoded()), StandardCharsets.UTF_8); + // generate md5 + MessageDigest digest = MessageDigest.getInstance("MD5"); + String md5 = + new String(encoder.encode(digest.digest(secretKey.getEncoded())), StandardCharsets.UTF_8); + + S3FileIO s3FileIO = new S3FileIO(); + s3FileIO.initialize( + ImmutableMap.of( + S3FileIOProperties.S3_ANALYTICS_ACCELERATOR_ENABLED, + String.valueOf(true), + S3FileIOProperties.SSE_TYPE, + S3FileIOProperties.SSE_TYPE_CUSTOM, + S3FileIOProperties.SSE_KEY, + encodedKey, + S3FileIOProperties.SSE_MD5, + md5)); + + return s3FileIO; + } + + @Test + public void testServerSideCustomEncryptionWithAnalyticsAcceleratorEnabled() throws Exception { + requireKMSEncryptionSupport(); + S3FileIO s3FileIO = + getS3FileIOInstanceWithAnalyticsAcceleratorAndServerSideCustomEncryptionEnabled(); + write(s3FileIO); + validateRead(s3FileIO); + } + + @Test + public void testCreateAndReadWithDifferentSSECKeyWithAnalyticsAcceleratorEnabled() + throws Exception { + requireKMSEncryptionSupport(); + S3FileIO s3FileIO = + getS3FileIOInstanceWithAnalyticsAcceleratorAndServerSideCustomEncryptionEnabled(); + write(s3FileIO); + S3FileIO s3FileIOWithDifferentKey = + getS3FileIOInstanceWithAnalyticsAcceleratorAndServerSideCustomEncryptionEnabled(); + + assertThatThrownBy(() -> validateRead(s3FileIOWithDifferentKey)) + .isInstanceOf(S3Exception.class) + .hasMessageContaining("Forbidden") + .extracting("statusCode") + .isEqualTo(403); + } + + @Test + public void testCreateAndReadWithEmptySSECKeyWithAnalyticsAcceleratorEnabled() throws Exception { + requireKMSEncryptionSupport(); + S3FileIO s3FileIO = + getS3FileIOInstanceWithAnalyticsAcceleratorAndServerSideCustomEncryptionEnabled(); + write(s3FileIO); + S3FileIO s3FileIOWithEmptyKey = new S3FileIO(); + s3FileIOWithEmptyKey.initialize( + ImmutableMap.of( + S3FileIOProperties.S3_ANALYTICS_ACCELERATOR_ENABLED, + String.valueOf(true), + S3FileIOProperties.SSE_TYPE, + S3FileIOProperties.SSE_TYPE_CUSTOM, + S3FileIOProperties.SSE_KEY, + "", + S3FileIOProperties.SSE_MD5, + "")); + + assertThatThrownBy(() -> validateRead(s3FileIOWithEmptyKey)) + .isInstanceOf(S3Exception.class) + .hasMessageContaining("Bad Request") + .extracting("statusCode") + .isEqualTo(400); + } + @Test public void testACL() throws Exception { requireACLSupport(); diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/AnalyticsAcceleratorUtil.java b/aws/src/main/java/org/apache/iceberg/aws/s3/AnalyticsAcceleratorUtil.java index ed1c3a4879e3..81b09bab609f 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/AnalyticsAcceleratorUtil.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/AnalyticsAcceleratorUtil.java @@ -22,6 +22,7 @@ import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.RemovalListener; import java.io.IOException; +import java.util.Optional; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.SeekableInputStream; import org.apache.iceberg.util.Pair; @@ -35,6 +36,7 @@ import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration; import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory; import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration; +import software.amazon.s3.analyticsaccelerator.request.EncryptionSecrets; import software.amazon.s3.analyticsaccelerator.request.ObjectClient; import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata; import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation; @@ -59,14 +61,21 @@ private AnalyticsAcceleratorUtil() {} public static SeekableInputStream newStream(S3InputFile inputFile) { S3URI uri = S3URI.of(inputFile.uri().bucket(), inputFile.uri().key()); HeadObjectResponse metadata = inputFile.getObjectMetadata(); - OpenStreamInformation openStreamInfo = - OpenStreamInformation.builder() - .objectMetadata( - ObjectMetadata.builder() - .contentLength(metadata.contentLength()) - .etag(metadata.eTag()) - .build()) - .build(); + OpenStreamInformation.OpenStreamInformationBuilder openStreamInformationBuilder = + OpenStreamInformation.builder(); + openStreamInformationBuilder.objectMetadata( + ObjectMetadata.builder() + .contentLength(metadata.contentLength()) + .etag(metadata.eTag()) + .build()); + if (inputFile.s3FileIOProperties().sseType().equals(S3FileIOProperties.SSE_TYPE_CUSTOM)) { + openStreamInformationBuilder.encryptionSecrets( + EncryptionSecrets.builder() + .sseCustomerKey(Optional.of(inputFile.s3FileIOProperties().sseKey())) + .build()); + } + + OpenStreamInformation openStreamInfo = openStreamInformationBuilder.build(); S3SeekableInputStreamFactory factory = STREAM_FACTORY_CACHE.get(