diff --git a/lib/trino-filesystem-s3/pom.xml b/lib/trino-filesystem-s3/pom.xml index 9e73e75f4049..0bf6f8d83775 100644 --- a/lib/trino-filesystem-s3/pom.xml +++ b/lib/trino-filesystem-s3/pom.xml @@ -303,7 +303,7 @@ maven-surefire-plugin - **/TestS3FileSystemAwsS3.java + **/TestS3FileSystemAwsS3*.java @@ -320,7 +320,7 @@ maven-surefire-plugin - **/TestS3FileSystemAwsS3.java + **/TestS3FileSystemAwsS3*.java diff --git a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystem.java b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystem.java index c4526e7186f7..c0a31e3aed43 100644 --- a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystem.java +++ b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystem.java @@ -23,6 +23,7 @@ import io.trino.filesystem.TrinoInputFile; import io.trino.filesystem.TrinoOutputFile; import io.trino.filesystem.UriLocation; +import io.trino.filesystem.encryption.EncryptionKey; import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.CommonPrefix; @@ -56,6 +57,8 @@ import static com.google.common.collect.ImmutableSet.toImmutableSet; import static com.google.common.collect.Iterables.partition; import static com.google.common.collect.Multimaps.toMultimap; +import static io.trino.filesystem.s3.S3SseCUtils.encoded; +import static io.trino.filesystem.s3.S3SseCUtils.md5Checksum; import static java.util.Objects.requireNonNull; import static java.util.stream.Collectors.toMap; @@ -80,25 +83,49 @@ public S3FileSystem(Executor uploadExecutor, S3Client client, S3Presigner preSig @Override public TrinoInputFile newInputFile(Location location) { - return new S3InputFile(client, context, new S3Location(location), null, null); + return new S3InputFile(client, context, new S3Location(location), null, null, Optional.empty()); } @Override public TrinoInputFile newInputFile(Location location, long length) { - return new S3InputFile(client, context, new S3Location(location), length, null); + return new S3InputFile(client, context, new S3Location(location), length, null, Optional.empty()); } @Override public TrinoInputFile newInputFile(Location location, long length, Instant lastModified) { - return new S3InputFile(client, context, new S3Location(location), length, lastModified); + return new S3InputFile(client, context, new S3Location(location), length, lastModified, Optional.empty()); + } + + @Override + public TrinoInputFile newEncryptedInputFile(Location location, EncryptionKey key) + { + return new S3InputFile(client, context, new S3Location(location), null, null, Optional.of(key)); + } + + @Override + public TrinoInputFile newEncryptedInputFile(Location location, long length, EncryptionKey key) + { + return new S3InputFile(client, context, new S3Location(location), length, null, Optional.of(key)); + } + + @Override + public TrinoInputFile newEncryptedInputFile(Location location, long length, Instant lastModified, EncryptionKey key) + { + return new S3InputFile(client, context, new S3Location(location), length, lastModified, Optional.of(key)); } @Override public TrinoOutputFile newOutputFile(Location location) { - return new S3OutputFile(uploadExecutor, client, context, new S3Location(location)); + return new S3OutputFile(uploadExecutor, client, context, new S3Location(location), Optional.empty()); + } + + @Override + public TrinoOutputFile newEncryptedOutputFile(Location location, EncryptionKey key) + { + return new S3OutputFile(uploadExecutor, client, context, new S3Location(location), Optional.of(key)); } @Override @@ -295,6 +322,19 @@ public Optional createTemporaryDirectory(Location targetPath, String t @Override public Optional preSignedUri(Location location, Duration ttl) throws IOException + { + return encryptedPreSignedUri(location, ttl, Optional.empty()); + } + + @Override + public Optional encryptedPreSignedUri(Location location, Duration ttl, EncryptionKey key) + throws IOException + { + return encryptedPreSignedUri(location, ttl, Optional.of(key)); + } + + public Optional encryptedPreSignedUri(Location location, Duration ttl, Optional key) + throws IOException { location.verifyValidFileLocation(); S3Location s3Location = new S3Location(location); @@ -304,6 +344,11 @@ public Optional preSignedUri(Location location, Duration ttl) .requestPayer(requestPayer) .key(s3Location.key()) .bucket(s3Location.bucket()) + .applyMutation(builder -> key.ifPresent(encryption -> { + builder.sseCustomerKeyMD5(md5Checksum(encryption)); + builder.sseCustomerAlgorithm(encryption.algorithm()); + builder.sseCustomerKey(encoded(encryption)); + })) .build(); GetObjectPresignRequest preSignRequest = GetObjectPresignRequest.builder() diff --git a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3InputFile.java b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3InputFile.java index c7a70da308ed..b6bddd2d88f0 100644 --- a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3InputFile.java +++ b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3InputFile.java @@ -18,6 +18,7 @@ import io.trino.filesystem.TrinoInput; import io.trino.filesystem.TrinoInputFile; import io.trino.filesystem.TrinoInputStream; +import io.trino.filesystem.encryption.EncryptionKey; import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.GetObjectRequest; @@ -29,7 +30,10 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.time.Instant; +import java.util.Optional; +import static io.trino.filesystem.s3.S3SseCUtils.encoded; +import static io.trino.filesystem.s3.S3SseCUtils.md5Checksum; import static java.util.Objects.requireNonNull; final class S3InputFile @@ -39,10 +43,11 @@ final class S3InputFile private final S3Location location; private final S3Context context; private final RequestPayer requestPayer; + private final Optional key; private Long length; private Instant lastModified; - public S3InputFile(S3Client client, S3Context context, S3Location location, Long length, Instant lastModified) + public S3InputFile(S3Client client, S3Context context, S3Location location, Long length, Instant lastModified, Optional key) { this.client = requireNonNull(client, "client is null"); this.location = requireNonNull(location, "location is null"); @@ -50,6 +55,7 @@ public S3InputFile(S3Client client, S3Context context, S3Location location, Long this.requestPayer = context.requestPayer(); this.length = length; this.lastModified = lastModified; + this.key = requireNonNull(key, "key is null"); location.location().verifyValidFileLocation(); } @@ -105,6 +111,11 @@ private GetObjectRequest newGetObjectRequest() .requestPayer(requestPayer) .bucket(location.bucket()) .key(location.key()) + .applyMutation(builder -> key.ifPresent(encryption -> { + builder.sseCustomerKey(encoded(encryption)); + builder.sseCustomerAlgorithm(encryption.algorithm()); + builder.sseCustomerKeyMD5(md5Checksum(encryption)); + })) .build(); } @@ -116,6 +127,11 @@ private boolean headObject() .requestPayer(requestPayer) .bucket(location.bucket()) .key(location.key()) + .applyMutation(builder -> key.ifPresent(encryption -> { + builder.sseCustomerKey(encoded(encryption)); + builder.sseCustomerAlgorithm(encryption.algorithm()); + builder.sseCustomerKeyMD5(md5Checksum(encryption)); + })) .build(); try { diff --git a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3OutputFile.java b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3OutputFile.java index a01b2cf3090e..d7af4fb965cd 100644 --- a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3OutputFile.java +++ b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3OutputFile.java @@ -15,11 +15,13 @@ import io.trino.filesystem.Location; import io.trino.filesystem.TrinoOutputFile; +import io.trino.filesystem.encryption.EncryptionKey; import io.trino.memory.context.AggregatedMemoryContext; import software.amazon.awssdk.services.s3.S3Client; import java.io.IOException; import java.io.OutputStream; +import java.util.Optional; import java.util.concurrent.Executor; import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext; @@ -32,13 +34,15 @@ final class S3OutputFile private final S3Client client; private final S3Context context; private final S3Location location; + private final Optional key; - public S3OutputFile(Executor uploadExecutor, S3Client client, S3Context context, S3Location location) + public S3OutputFile(Executor uploadExecutor, S3Client client, S3Context context, S3Location location, Optional key) { this.uploadExecutor = requireNonNull(uploadExecutor, "uploadExecutor is null"); this.client = requireNonNull(client, "client is null"); this.context = requireNonNull(context, "context is null"); this.location = requireNonNull(location, "location is null"); + this.key = requireNonNull(key, "key is null"); location.location().verifyValidFileLocation(); } @@ -72,7 +76,7 @@ public OutputStream create(AggregatedMemoryContext memoryContext) public OutputStream create(AggregatedMemoryContext memoryContext, boolean exclusive) { - return new S3OutputStream(memoryContext, uploadExecutor, client, context, location, exclusive); + return new S3OutputStream(memoryContext, uploadExecutor, client, context, location, exclusive, key); } @Override diff --git a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3OutputStream.java b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3OutputStream.java index 65d0b87c5b31..8de755f614e5 100644 --- a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3OutputStream.java +++ b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3OutputStream.java @@ -13,6 +13,7 @@ */ package io.trino.filesystem.s3; +import io.trino.filesystem.encryption.EncryptionKey; import io.trino.filesystem.s3.S3FileSystemConfig.S3SseType; import io.trino.memory.context.AggregatedMemoryContext; import io.trino.memory.context.LocalMemoryContext; @@ -43,7 +44,10 @@ import java.util.concurrent.Executor; import java.util.concurrent.Future; +import static com.google.common.base.Verify.verify; import static io.trino.filesystem.s3.S3FileSystemConfig.ObjectCannedAcl.getCannedAcl; +import static io.trino.filesystem.s3.S3SseCUtils.encoded; +import static io.trino.filesystem.s3.S3SseCUtils.md5Checksum; import static java.lang.Math.clamp; import static java.lang.Math.max; import static java.lang.Math.min; @@ -69,6 +73,7 @@ final class S3OutputStream private final String sseKmsKeyId; private final ObjectCannedACL cannedAcl; private final boolean exclusiveCreate; + private final Optional key; private int currentPartNumber; private byte[] buffer = new byte[0]; @@ -85,7 +90,7 @@ final class S3OutputStream // Visibility is ensured by calling get() on inProgressUploadFuture. private Optional uploadId = Optional.empty(); - public S3OutputStream(AggregatedMemoryContext memoryContext, Executor uploadExecutor, S3Client client, S3Context context, S3Location location, boolean exclusiveCreate) + public S3OutputStream(AggregatedMemoryContext memoryContext, Executor uploadExecutor, S3Client client, S3Context context, S3Location location, boolean exclusiveCreate, Optional key) { this.memoryContext = memoryContext.newLocalMemoryContext(S3OutputStream.class.getSimpleName()); this.uploadExecutor = requireNonNull(uploadExecutor, "uploadExecutor is null"); @@ -98,6 +103,9 @@ public S3OutputStream(AggregatedMemoryContext memoryContext, Executor uploadExec this.sseType = context.sseType(); this.sseKmsKeyId = context.sseKmsKeyId(); this.cannedAcl = getCannedAcl(context.cannedAcl()); + this.key = requireNonNull(key, "key is null"); + + verify(key.isEmpty() || sseType == S3SseType.NONE, "Encryption key cannot be used with sse configuration"); } @SuppressWarnings("NumericCastThatLosesPrecision") @@ -216,6 +224,11 @@ private void flushBuffer(boolean finished) if (exclusiveCreate) { builder.ifNoneMatch("*"); } + key.ifPresent(encryption -> { + builder.sseCustomerKey(encoded(encryption)); + builder.sseCustomerAlgorithm(encryption.algorithm()); + builder.sseCustomerKeyMD5(md5Checksum(encryption)); + }); switch (sseType) { case NONE -> { /* ignored */ } case S3 -> builder.serverSideEncryption(AES256); @@ -301,6 +314,11 @@ private CompletedPart uploadPage(byte[] data, int length) .bucket(location.bucket()) .key(location.key()) .applyMutation(builder -> { + key.ifPresent(encryption -> { + builder.sseCustomerKey(encoded(encryption)); + builder.sseCustomerAlgorithm(encryption.algorithm()); + builder.sseCustomerKeyMD5(md5Checksum(encryption)); + }); switch (sseType) { case NONE -> { /* ignored */ } case S3 -> builder.serverSideEncryption(AES256); @@ -321,6 +339,11 @@ private CompletedPart uploadPage(byte[] data, int length) .contentLength((long) length) .uploadId(uploadId.get()) .partNumber(currentPartNumber) + .applyMutation(builder -> key.ifPresent(encryption -> { + builder.sseCustomerKey(encoded(encryption)); + builder.sseCustomerAlgorithm(encryption.algorithm()); + builder.sseCustomerKeyMD5(md5Checksum(encryption)); + })) .build(); ByteBuffer bytes = ByteBuffer.wrap(data, 0, length); @@ -346,6 +369,11 @@ private void finishUpload(String uploadId) .uploadId(uploadId) .multipartUpload(x -> x.parts(parts)) .applyMutation(builder -> { + key.ifPresent(encodingKey -> { + builder.sseCustomerKey(encoded(encodingKey)); + builder.sseCustomerAlgorithm(encodingKey.algorithm()); + builder.sseCustomerKeyMD5(md5Checksum(encodingKey)); + }); if (exclusiveCreate) { builder.ifNoneMatch("*"); } diff --git a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3SseCUtils.java b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3SseCUtils.java new file mode 100644 index 000000000000..47d1975132b8 --- /dev/null +++ b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3SseCUtils.java @@ -0,0 +1,42 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.filesystem.s3; + +import io.trino.filesystem.encryption.EncryptionKey; + +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.Base64; + +final class S3SseCUtils +{ + private S3SseCUtils() {} + + public static String encoded(EncryptionKey key) + { + return Base64.getEncoder().encodeToString(key.key()); + } + + public static String md5Checksum(EncryptionKey key) + { + try { + MessageDigest digest = MessageDigest.getInstance("MD5"); + digest.update(key.key()); + return Base64.getEncoder().encodeToString(digest.digest()); + } + catch (NoSuchAlgorithmException e) { + throw new RuntimeException(e); + } + } +} diff --git a/lib/trino-filesystem-s3/src/test/java/io/trino/filesystem/s3/AbstractTestS3FileSystem.java b/lib/trino-filesystem-s3/src/test/java/io/trino/filesystem/s3/AbstractTestS3FileSystem.java index 35a8106e01de..d0e60dcbceaf 100644 --- a/lib/trino-filesystem-s3/src/test/java/io/trino/filesystem/s3/AbstractTestS3FileSystem.java +++ b/lib/trino-filesystem-s3/src/test/java/io/trino/filesystem/s3/AbstractTestS3FileSystem.java @@ -24,26 +24,35 @@ import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.TrinoInputFile; import io.trino.filesystem.TrinoInputStream; +import io.trino.filesystem.encryption.EncryptionEnforcingFileSystem; +import io.trino.filesystem.encryption.EncryptionKey; import io.trino.spi.security.ConnectorIdentity; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; import java.io.Closeable; import java.io.IOException; import java.util.List; import static com.google.common.collect.Iterables.getOnlyElement; +import static io.trino.filesystem.encryption.EncryptionKey.randomAes256; +import static io.trino.filesystem.s3.S3SseCUtils.encoded; +import static io.trino.filesystem.s3.S3SseCUtils.md5Checksum; import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Objects.requireNonNull; import static org.assertj.core.api.Assertions.assertThat; +import static software.amazon.awssdk.services.s3.model.ServerSideEncryption.AES256; public abstract class AbstractTestS3FileSystem extends AbstractTestTrinoFileSystem { + protected final EncryptionKey randomEncryptionKey = randomAes256(); private S3FileSystemFactory fileSystemFactory; private TrinoFileSystem fileSystem; @@ -74,6 +83,9 @@ protected final boolean isHierarchical() @Override protected final TrinoFileSystem getFileSystem() { + if (useServerSideEncryptionWithCustomerKey()) { + return new EncryptionEnforcingFileSystem(fileSystem, randomEncryptionKey); + } return fileSystem; } @@ -134,19 +146,30 @@ void testFileWithTrailingWhitespaceAgainstNativeClient() try (S3Client s3Client = createS3Client()) { String key = "foo/bar with whitespace "; byte[] contents = "abc foo bar".getBytes(UTF_8); - s3Client.putObject( - request -> request.bucket(bucket()).key(key), - RequestBody.fromBytes(contents.clone())); + + PutObjectRequest putObjectRequest = PutObjectRequest.builder() + .bucket(bucket()) + .key(key) + .applyMutation(builder -> { + if (useServerSideEncryptionWithCustomerKey()) { + builder.sseCustomerAlgorithm(AES256.toString()); + builder.sseCustomerKey(encoded(randomEncryptionKey)); + builder.sseCustomerKeyMD5(md5Checksum(randomEncryptionKey)); + } + }) + .build(); + + s3Client.putObject(putObjectRequest, RequestBody.fromBytes(contents.clone())); try { // Verify listing - List listing = toList(fileSystem.listFiles(getRootLocation().appendPath("foo"))); + List listing = toList(getFileSystem().listFiles(getRootLocation().appendPath("foo"))); assertThat(listing).hasSize(1); FileEntry fileEntry = getOnlyElement(listing); assertThat(fileEntry.location()).isEqualTo(getRootLocation().appendPath(key)); assertThat(fileEntry.length()).isEqualTo(contents.length); // Verify reading - TrinoInputFile inputFile = fileSystem.newInputFile(fileEntry.location()); + TrinoInputFile inputFile = getFileSystem().newInputFile(fileEntry.location()); assertThat(inputFile.exists()).as("exists").isTrue(); try (TrinoInputStream inputStream = inputFile.newStream()) { byte[] bytes = ByteStreams.toByteArray(inputStream); @@ -155,12 +178,24 @@ void testFileWithTrailingWhitespaceAgainstNativeClient() // Verify writing byte[] newContents = "bar bar baz new content".getBytes(UTF_8); - fileSystem.newOutputFile(fileEntry.location()).createOrOverwrite(newContents); - assertThat(s3Client.getObjectAsBytes(request -> request.bucket(bucket()).key(key)).asByteArray()) + getFileSystem().newOutputFile(fileEntry.location()).createOrOverwrite(newContents); + GetObjectRequest request = GetObjectRequest.builder() + .bucket(bucket()) + .key(key) + .applyMutation(builder -> { + if (useServerSideEncryptionWithCustomerKey()) { + builder.sseCustomerAlgorithm(AES256.toString()); + builder.sseCustomerKey(encoded(randomEncryptionKey)); + builder.sseCustomerKeyMD5(md5Checksum(randomEncryptionKey)); + } + }) + .build(); + + assertThat(s3Client.getObjectAsBytes(request).asByteArray()) .isEqualTo(newContents); // Verify deleting - fileSystem.deleteFile(fileEntry.location()); + getFileSystem().deleteFile(fileEntry.location()); assertThat(inputFile.exists()).as("exists after delete").isFalse(); } finally { @@ -176,17 +211,17 @@ void testExistingDirectoryWithTrailingSlash() try (S3Client s3Client = createS3Client(); Closer closer = Closer.create()) { String key = "data/dir/"; createDirectory(closer, s3Client, key); - assertThat(fileSystem.listFiles(getRootLocation()).hasNext()).isFalse(); + assertThat(getFileSystem().listFiles(getRootLocation()).hasNext()).isFalse(); Location data = getRootLocation().appendPath("data/"); - assertThat(fileSystem.listDirectories(getRootLocation())).containsExactly(data); - assertThat(fileSystem.listDirectories(data)).containsExactly(data.appendPath("dir/")); + assertThat(getFileSystem().listDirectories(getRootLocation())).containsExactly(data); + assertThat(getFileSystem().listDirectories(data)).containsExactly(data.appendPath("dir/")); - fileSystem.deleteDirectory(data); - assertThat(fileSystem.listDirectories(getRootLocation())).isEmpty(); + getFileSystem().deleteDirectory(data); + assertThat(getFileSystem().listDirectories(getRootLocation())).isEmpty(); - fileSystem.deleteDirectory(getRootLocation()); - assertThat(fileSystem.listDirectories(getRootLocation())).isEmpty(); + getFileSystem().deleteDirectory(getRootLocation()); + assertThat(getFileSystem().listDirectories(getRootLocation())).isEmpty(); } } @@ -202,16 +237,16 @@ void testDeleteEmptyDirectoryWithDeepHierarchy() createDirectory(closer, s3Client, "deep/dir/dir4"); createBlob(closer, "deep/dir/dir4/file5.txt"); - assertThat(fileSystem.listFiles(getRootLocation()).hasNext()).isTrue(); + assertThat(getFileSystem().listFiles(getRootLocation()).hasNext()).isTrue(); Location directory = getRootLocation().appendPath("deep/dir/"); - assertThat(fileSystem.listDirectories(getRootLocation().appendPath("deep"))).containsExactly(directory); - assertThat(fileSystem.listDirectories(directory)).containsExactly(getRootLocation().appendPath("deep/dir/dir4/")); + assertThat(getFileSystem().listDirectories(getRootLocation().appendPath("deep"))).containsExactly(directory); + assertThat(getFileSystem().listDirectories(directory)).containsExactly(getRootLocation().appendPath("deep/dir/dir4/")); - fileSystem.deleteDirectory(directory); - assertThat(fileSystem.listDirectories(getRootLocation().appendPath("deep"))).isEmpty(); - assertThat(fileSystem.listDirectories(getRootLocation())).isEmpty(); - assertThat(fileSystem.listFiles(getRootLocation()).hasNext()).isFalse(); + getFileSystem().deleteDirectory(directory); + assertThat(getFileSystem().listDirectories(getRootLocation().appendPath("deep"))).isEmpty(); + assertThat(getFileSystem().listDirectories(getRootLocation())).isEmpty(); + assertThat(getFileSystem().listFiles(getRootLocation()).hasNext()).isFalse(); } } diff --git a/lib/trino-filesystem-s3/src/test/java/io/trino/filesystem/s3/TestS3FileSystemAwsS3WithEncryption.java b/lib/trino-filesystem-s3/src/test/java/io/trino/filesystem/s3/TestS3FileSystemAwsS3WithEncryption.java new file mode 100644 index 000000000000..a61dbccac9ad --- /dev/null +++ b/lib/trino-filesystem-s3/src/test/java/io/trino/filesystem/s3/TestS3FileSystemAwsS3WithEncryption.java @@ -0,0 +1,24 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.filesystem.s3; + +public class TestS3FileSystemAwsS3WithEncryption + extends TestS3FileSystemAwsS3 +{ + @Override + protected boolean useServerSideEncryptionWithCustomerKey() + { + return true; + } +} diff --git a/lib/trino-filesystem-s3/src/test/java/io/trino/filesystem/s3/TestS3FileSystemLocalStackWithEncryption.java b/lib/trino-filesystem-s3/src/test/java/io/trino/filesystem/s3/TestS3FileSystemLocalStackWithEncryption.java new file mode 100644 index 000000000000..83db1e6d424e --- /dev/null +++ b/lib/trino-filesystem-s3/src/test/java/io/trino/filesystem/s3/TestS3FileSystemLocalStackWithEncryption.java @@ -0,0 +1,27 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.filesystem.s3; + +import org.testcontainers.junit.jupiter.Testcontainers; + +@Testcontainers +public class TestS3FileSystemLocalStackWithEncryption + extends TestS3FileSystemLocalStack +{ + @Override + protected boolean useServerSideEncryptionWithCustomerKey() + { + return true; + } +} diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/TrinoFileSystem.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/TrinoFileSystem.java index 51cefdb8c9f3..521fa3902631 100644 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/TrinoFileSystem.java +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/TrinoFileSystem.java @@ -15,6 +15,7 @@ import com.google.common.base.Throwables; import io.airlift.units.Duration; +import io.trino.filesystem.encryption.EncryptionKey; import java.io.FileNotFoundException; import java.io.IOException; @@ -63,6 +64,18 @@ public interface TrinoFileSystem */ TrinoInputFile newInputFile(Location location); + /** + * Creates an encrypted TrinoInputFile which can be used to read the encrypted file data. + * The file location path cannot be empty, and must not end with a slash or whitespace. + * + * @throws IllegalArgumentException if location is not valid for this file system + * @throws UnsupportedOperationException if server side encryption is not supported + */ + default TrinoInputFile newEncryptedInputFile(Location location, EncryptionKey key) + { + throw new UnsupportedOperationException("Server side encryption is not supported"); + } + /** * Creates a TrinoInputFile with a predeclared length which can be used to read the file data. * The length will be returned from {@link TrinoInputFile#length()} and the actual file length @@ -73,6 +86,20 @@ public interface TrinoFileSystem */ TrinoInputFile newInputFile(Location location, long length); + /** + * Creates an encrypted TrinoInputFile with a predeclared length which can be used to read + * the file encrypted data. The length will be returned from {@link TrinoInputFile#length()} and + * the actual file length will never be checked. + * The file location path cannot be empty, and must not end with a slash or whitespace. + * + * @throws IllegalArgumentException if location is not valid for this file system + * @throws UnsupportedOperationException if server side encryption is not supported + */ + default TrinoInputFile newEncryptedInputFile(Location location, long length, EncryptionKey key) + { + throw new UnsupportedOperationException("Server side encryption is not supported"); + } + /** * Creates a TrinoInputFile with a predeclared length and lastModifiedTime which can be used to read the file data. * The length will be returned from {@link TrinoInputFile#length()} and the actual file length @@ -84,6 +111,21 @@ public interface TrinoFileSystem */ TrinoInputFile newInputFile(Location location, long length, Instant lastModified); + /** + * Creates an encrypted TrinoInputFile with a predeclared length and lastModifiedTime which can be used to read + * the encrypted file data. The length will be returned from {@link TrinoInputFile#length()} and the actual file + * length will never be checked. The lastModified will be returned from {@link TrinoInputFile#lastModified()} + * and the actual file last modified time will never be checked. + * The file location path cannot be empty, and must not end with a slash or whitespace. + * + * @throws IllegalArgumentException if location is not valid for this file system + * @throws UnsupportedOperationException if server side encryption is not supported + */ + default TrinoInputFile newEncryptedInputFile(Location location, long length, Instant lastModified, EncryptionKey key) + { + throw new UnsupportedOperationException("Server side encryption is not supported"); + } + /** * Creates a TrinoOutputFile which can be used to create or overwrite the file. The file * location path cannot be empty, and must not end with a slash or whitespace. @@ -92,6 +134,18 @@ public interface TrinoFileSystem */ TrinoOutputFile newOutputFile(Location location); + /** + * Creates an encrypted TrinoOutputFile which can be used to create or overwrite the file. + * The file location path cannot be empty, and must not end with a slash or whitespace. + * + * @throws IllegalArgumentException if location is not valid for this file system + * @throws UnsupportedOperationException if server side encryption is not supported + */ + default TrinoOutputFile newEncryptedOutputFile(Location location, EncryptionKey key) + { + throw new UnsupportedOperationException("Server side encryption is not supported"); + } + /** * Deletes the specified file. The file location path cannot be empty, and must not end with * a slash or whitespace. If the file is a directory, an exception is raised. If the file does @@ -258,6 +312,22 @@ default Optional preSignedUri(Location location, Duration ttl) throw new UnsupportedOperationException("Pre-signed URIs are not supported by " + getClass().getSimpleName()); } + /** + * Returns the direct encrypted pre-signed URI location for the given storage location. + *

+ * Pre-signed URIs allow for retrieval of the files directly from the storage location. + * This is useful for large files where the server would be a bottleneck. + * + * @throws UnsupportedOperationException if the pre-signed URIs are not supported + * @return the pre-signed URI to the storage location or `Optional.empty()` + * if pre-signed URI cannot be generated. + */ + default Optional encryptedPreSignedUri(Location location, Duration ttl, EncryptionKey key) + throws IOException + { + throw new UnsupportedOperationException("Encrypted pre-signed URIs are not supported by " + getClass().getSimpleName()); + } + /** * Checks whether given exception is unrecoverable, so that further retries won't help *

diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/encryption/EncryptionEnforcingFileSystem.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/encryption/EncryptionEnforcingFileSystem.java new file mode 100644 index 000000000000..e832bdb48206 --- /dev/null +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/encryption/EncryptionEnforcingFileSystem.java @@ -0,0 +1,189 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.filesystem.encryption; + +import io.airlift.units.Duration; +import io.trino.filesystem.FileIterator; +import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoFileSystem; +import io.trino.filesystem.TrinoInputFile; +import io.trino.filesystem.TrinoOutputFile; +import io.trino.filesystem.UriLocation; + +import java.io.IOException; +import java.time.Instant; +import java.util.Collection; +import java.util.Optional; +import java.util.Set; + +import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; + +/** + * File system implementation that enforces encrypted file system calls. + */ +public class EncryptionEnforcingFileSystem + implements TrinoFileSystem +{ + private final TrinoFileSystem delegate; + private final EncryptionKey key; + + public EncryptionEnforcingFileSystem(TrinoFileSystem delegate, EncryptionKey key) + { + this.delegate = requireNonNull(delegate, "delegate is null"); + this.key = requireNonNull(key, "key is null"); + } + + @Override + public TrinoInputFile newInputFile(Location location) + { + return newEncryptedInputFile(location, key); + } + + @Override + public TrinoInputFile newEncryptedInputFile(Location location, EncryptionKey key) + { + checkArgument(this.key.equals(key), "Provided key is not the same as the class encryption key"); + return delegate.newEncryptedInputFile(location, key); + } + + @Override + public TrinoInputFile newInputFile(Location location, long length) + { + return delegate.newEncryptedInputFile(location, length, key); + } + + @Override + public TrinoInputFile newEncryptedInputFile(Location location, long length, EncryptionKey key) + { + checkArgument(this.key.equals(key), "Provided key is not the same as the class encryption key"); + return delegate.newEncryptedInputFile(location, length, key); + } + + @Override + public TrinoInputFile newInputFile(Location location, long length, Instant lastModified) + { + return delegate.newEncryptedInputFile(location, length, lastModified, key); + } + + @Override + public TrinoInputFile newEncryptedInputFile(Location location, long length, Instant lastModified, EncryptionKey key) + { + checkArgument(this.key.equals(key), "Provided key is not the same as the class encryption key"); + return delegate.newEncryptedInputFile(location, length, key); + } + + @Override + public TrinoOutputFile newOutputFile(Location location) + { + return delegate.newEncryptedOutputFile(location, key); + } + + @Override + public TrinoOutputFile newEncryptedOutputFile(Location location, EncryptionKey key) + { + checkArgument(this.key.equals(key), "Provided key is not the same as the class encryption key"); + return delegate.newEncryptedOutputFile(location, key); + } + + @Override + public void deleteFile(Location location) + throws IOException + { + delegate.deleteFile(location); + } + + @Override + public void deleteFiles(Collection locations) + throws IOException + { + delegate.deleteFiles(locations); + } + + @Override + public void deleteDirectory(Location location) + throws IOException + { + delegate.deleteDirectory(location); + } + + @Override + public void renameFile(Location source, Location target) + throws IOException + { + delegate.renameFile(source, target); + } + + @Override + public FileIterator listFiles(Location location) + throws IOException + { + return delegate.listFiles(location); + } + + @Override + public Optional directoryExists(Location location) + throws IOException + { + return delegate.directoryExists(location); + } + + @Override + public void createDirectory(Location location) + throws IOException + { + delegate.createDirectory(location); + } + + @Override + public void renameDirectory(Location source, Location target) + throws IOException + { + delegate.renameDirectory(source, target); + } + + @Override + public Set listDirectories(Location location) + throws IOException + { + return delegate.listDirectories(location); + } + + @Override + public Optional createTemporaryDirectory(Location targetPath, String temporaryPrefix, String relativePrefix) + throws IOException + { + return delegate.createTemporaryDirectory(targetPath, temporaryPrefix, relativePrefix); + } + + @Override + public Optional preSignedUri(Location location, Duration ttl) + throws IOException + { + return delegate.encryptedPreSignedUri(location, ttl, key); + } + + @Override + public Optional encryptedPreSignedUri(Location location, Duration ttl, EncryptionKey key) + throws IOException + { + checkArgument(this.key.equals(key), "Provided key is not the same as the class encryption key"); + return delegate.encryptedPreSignedUri(location, ttl, key); + } + + public TrinoFileSystem getDelegate() + { + return delegate; + } +} diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/encryption/EncryptionKey.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/encryption/EncryptionKey.java new file mode 100644 index 000000000000..474262df09a2 --- /dev/null +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/encryption/EncryptionKey.java @@ -0,0 +1,41 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.filesystem.encryption; + +import java.util.concurrent.ThreadLocalRandom; + +import static java.util.Objects.requireNonNull; + +public record EncryptionKey(byte[] key, String algorithm) +{ + public EncryptionKey + { + requireNonNull(algorithm, "algorithm is null"); + requireNonNull(key, "key is null"); + } + + public static EncryptionKey randomAes256() + { + byte[] key = new byte[32]; + ThreadLocalRandom.current().nextBytes(key); + return new EncryptionKey(key, "AES256"); + } + + @Override + public String toString() + { + // We intentionally overwrite toString to hide a key + return algorithm; + } +} diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/switching/SwitchingFileSystem.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/switching/SwitchingFileSystem.java index aa2531b2d573..5c65ccc1c9b6 100644 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/switching/SwitchingFileSystem.java +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/switching/SwitchingFileSystem.java @@ -21,6 +21,7 @@ import io.trino.filesystem.TrinoInputFile; import io.trino.filesystem.TrinoOutputFile; import io.trino.filesystem.UriLocation; +import io.trino.filesystem.encryption.EncryptionKey; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.security.ConnectorIdentity; @@ -157,6 +158,37 @@ public Optional preSignedUri(Location targetPath, Duration ttl) return fileSystem(targetPath).preSignedUri(targetPath, ttl); } + @Override + public TrinoInputFile newEncryptedInputFile(Location location, EncryptionKey key) + { + return fileSystem(location).newEncryptedInputFile(location, key); + } + + @Override + public TrinoInputFile newEncryptedInputFile(Location location, long length, EncryptionKey key) + { + return fileSystem(location).newEncryptedInputFile(location, length, key); + } + + @Override + public TrinoInputFile newEncryptedInputFile(Location location, long length, Instant lastModified, EncryptionKey key) + { + return fileSystem(location).newEncryptedInputFile(location, length, lastModified, key); + } + + @Override + public TrinoOutputFile newEncryptedOutputFile(Location location, EncryptionKey key) + { + return fileSystem(location).newEncryptedOutputFile(location, key); + } + + @Override + public Optional encryptedPreSignedUri(Location location, Duration ttl, EncryptionKey key) + throws IOException + { + return fileSystem(location).encryptedPreSignedUri(location, ttl, key); + } + private TrinoFileSystem fileSystem(Location location) { return createFileSystem(loader.apply(location)); diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracing/TracingFileSystem.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracing/TracingFileSystem.java index 5dd90fcb1abd..f7c6576fb7f6 100644 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracing/TracingFileSystem.java +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracing/TracingFileSystem.java @@ -22,6 +22,7 @@ import io.trino.filesystem.TrinoInputFile; import io.trino.filesystem.TrinoOutputFile; import io.trino.filesystem.UriLocation; +import io.trino.filesystem.encryption.EncryptionKey; import java.io.IOException; import java.time.Instant; @@ -177,4 +178,50 @@ public Optional preSignedUri(Location location, Duration ttl) .startSpan(); return withTracing(span, () -> delegate.preSignedUri(location, ttl)); } + + @Override + public TrinoInputFile newEncryptedInputFile(Location location, EncryptionKey key) + { + Span span = tracer.spanBuilder("FileSystem.newEncryptedInputFile") + .setAttribute(FileSystemAttributes.FILE_LOCATION, location.toString()) + .startSpan(); + return withTracing(span, () -> delegate.newEncryptedInputFile(location, key)); + } + + @Override + public TrinoInputFile newEncryptedInputFile(Location location, long length, EncryptionKey key) + { + Span span = tracer.spanBuilder("FileSystem.newEncryptedInputFile") + .setAttribute(FileSystemAttributes.FILE_LOCATION, location.toString()) + .startSpan(); + return withTracing(span, () -> delegate.newEncryptedInputFile(location, length, key)); + } + + @Override + public TrinoInputFile newEncryptedInputFile(Location location, long length, Instant lastModified, EncryptionKey key) + { + Span span = tracer.spanBuilder("FileSystem.newEncryptedInputFile") + .setAttribute(FileSystemAttributes.FILE_LOCATION, location.toString()) + .startSpan(); + return withTracing(span, () -> delegate.newEncryptedInputFile(location, length, lastModified, key)); + } + + @Override + public TrinoOutputFile newEncryptedOutputFile(Location location, EncryptionKey key) + { + Span span = tracer.spanBuilder("FileSystem.newEncryptedOutputFile") + .setAttribute(FileSystemAttributes.FILE_LOCATION, location.toString()) + .startSpan(); + return withTracing(span, () -> delegate.newEncryptedOutputFile(location, key)); + } + + @Override + public Optional encryptedPreSignedUri(Location location, Duration ttl, EncryptionKey key) + throws IOException + { + Span span = tracer.spanBuilder("FileSystem.encryptedPreSignedUri") + .setAttribute(FileSystemAttributes.FILE_LOCATION, location.toString()) + .startSpan(); + return withTracing(span, () -> delegate.encryptedPreSignedUri(location, ttl, key)); + } } diff --git a/lib/trino-filesystem/src/test/java/io/trino/filesystem/AbstractTestTrinoFileSystem.java b/lib/trino-filesystem/src/test/java/io/trino/filesystem/AbstractTestTrinoFileSystem.java index 2b2df9116132..05498e5bb0cd 100644 --- a/lib/trino-filesystem/src/test/java/io/trino/filesystem/AbstractTestTrinoFileSystem.java +++ b/lib/trino-filesystem/src/test/java/io/trino/filesystem/AbstractTestTrinoFileSystem.java @@ -19,6 +19,7 @@ import com.google.common.io.Closer; import io.airlift.slice.Slice; import io.airlift.units.Duration; +import io.trino.filesystem.encryption.EncryptionEnforcingFileSystem; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; @@ -63,6 +64,7 @@ import static java.util.concurrent.TimeUnit.SECONDS; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Fail.fail; import static org.junit.jupiter.api.Assumptions.abort; import static org.junit.jupiter.api.parallel.ExecutionMode.SAME_THREAD; @@ -81,6 +83,11 @@ public abstract class AbstractTestTrinoFileSystem protected abstract void verifyFileSystemIsEmpty(); + protected boolean useServerSideEncryptionWithCustomerKey() + { + return false; + } + /** * Specifies whether implementation {@link TrinoOutputFile#create()} is exclusive. */ @@ -1424,6 +1431,37 @@ public void testLargeFileDoesNotExistUntilClosed() getFileSystem().deleteFile(location); } + @Test + void testServerSideEncryptionWithCustomerKey() + throws IOException + { + if (!useServerSideEncryptionWithCustomerKey()) { + abort("Test is specific to SSE-C"); + } + + Location location = getRootLocation().appendPath("encrypted"); + + byte[] data = "this is encrypted data".getBytes(UTF_8); + + // Create encrypted file + getFileSystem().newOutputFile(location) + .createOrOverwrite(data); + + if (!(getFileSystem() instanceof EncryptionEnforcingFileSystem encryptionEnforcingFileSystem)) { + fail("Expected file system to enforce server side encryption"); + return; + } + + // Try to read it without a key + assertThatThrownBy(() -> encryptionEnforcingFileSystem.getDelegate().newInputFile(location).newStream().readAllBytes()) + .isInstanceOf(IOException.class); + + assertThat(getFileSystem().newInputFile(location).newStream().readAllBytes()) + .isEqualTo(data); + + getFileSystem().deleteFile(location); + } + @SuppressWarnings("ConstantValue") private static byte[] getBytes() {