Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lib/trino-filesystem-s3/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<excludes>
<exclude>**/TestS3FileSystemAwsS3.java</exclude>
<exclude>**/TestS3FileSystemAwsS3*.java</exclude>
</excludes>
</configuration>
</plugin>
Expand All @@ -320,7 +320,7 @@
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<includes>
<include>**/TestS3FileSystemAwsS3.java</include>
<include>**/TestS3FileSystemAwsS3*.java</include>
</includes>
</configuration>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.trino.filesystem.TrinoInputFile;
import io.trino.filesystem.TrinoOutputFile;
import io.trino.filesystem.UriLocation;
import io.trino.filesystem.encryption.EncryptionKey;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.CommonPrefix;
Expand Down Expand Up @@ -56,6 +57,8 @@
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.Iterables.partition;
import static com.google.common.collect.Multimaps.toMultimap;
import static io.trino.filesystem.s3.S3SseCUtils.encoded;
import static io.trino.filesystem.s3.S3SseCUtils.md5Checksum;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toMap;

Expand All @@ -80,25 +83,49 @@ public S3FileSystem(Executor uploadExecutor, S3Client client, S3Presigner preSig
@Override
public TrinoInputFile newInputFile(Location location)
{
return new S3InputFile(client, context, new S3Location(location), null, null);
return new S3InputFile(client, context, new S3Location(location), null, null, Optional.empty());
}

@Override
public TrinoInputFile newInputFile(Location location, long length)
{
return new S3InputFile(client, context, new S3Location(location), length, null);
return new S3InputFile(client, context, new S3Location(location), length, null, Optional.empty());
}

@Override
public TrinoInputFile newInputFile(Location location, long length, Instant lastModified)
{
return new S3InputFile(client, context, new S3Location(location), length, lastModified);
return new S3InputFile(client, context, new S3Location(location), length, lastModified, Optional.empty());
}

@Override
public TrinoInputFile newEncryptedInputFile(Location location, EncryptionKey key)
{
return new S3InputFile(client, context, new S3Location(location), null, null, Optional.of(key));
}

@Override
public TrinoInputFile newEncryptedInputFile(Location location, long length, EncryptionKey key)
{
return new S3InputFile(client, context, new S3Location(location), length, null, Optional.of(key));
}

@Override
public TrinoInputFile newEncryptedInputFile(Location location, long length, Instant lastModified, EncryptionKey key)
{
return new S3InputFile(client, context, new S3Location(location), length, lastModified, Optional.of(key));
}

@Override
public TrinoOutputFile newOutputFile(Location location)
{
return new S3OutputFile(uploadExecutor, client, context, new S3Location(location));
return new S3OutputFile(uploadExecutor, client, context, new S3Location(location), Optional.empty());
}

@Override
public TrinoOutputFile newEncryptedOutputFile(Location location, EncryptionKey key)
{
return new S3OutputFile(uploadExecutor, client, context, new S3Location(location), Optional.of(key));
}

@Override
Expand Down Expand Up @@ -295,6 +322,19 @@ public Optional<Location> createTemporaryDirectory(Location targetPath, String t
@Override
public Optional<UriLocation> preSignedUri(Location location, Duration ttl)
throws IOException
{
return encryptedPreSignedUri(location, ttl, Optional.empty());
}

@Override
public Optional<UriLocation> encryptedPreSignedUri(Location location, Duration ttl, EncryptionKey key)
Comment thread
wendigo marked this conversation as resolved.
Outdated
Comment thread
wendigo marked this conversation as resolved.
throws IOException
{
return encryptedPreSignedUri(location, ttl, Optional.of(key));
}

public Optional<UriLocation> encryptedPreSignedUri(Location location, Duration ttl, Optional<EncryptionKey> key)
throws IOException
{
location.verifyValidFileLocation();
S3Location s3Location = new S3Location(location);
Expand All @@ -304,6 +344,11 @@ public Optional<UriLocation> preSignedUri(Location location, Duration ttl)
.requestPayer(requestPayer)
.key(s3Location.key())
.bucket(s3Location.bucket())
.applyMutation(builder -> key.ifPresent(encryption -> {
builder.sseCustomerKeyMD5(md5Checksum(encryption));
builder.sseCustomerAlgorithm(encryption.algorithm());
builder.sseCustomerKey(encoded(encryption));
}))
.build();

GetObjectPresignRequest preSignRequest = GetObjectPresignRequest.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.trino.filesystem.TrinoInput;
import io.trino.filesystem.TrinoInputFile;
import io.trino.filesystem.TrinoInputStream;
import io.trino.filesystem.encryption.EncryptionKey;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
Expand All @@ -29,7 +30,10 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.time.Instant;
import java.util.Optional;

import static io.trino.filesystem.s3.S3SseCUtils.encoded;
import static io.trino.filesystem.s3.S3SseCUtils.md5Checksum;
import static java.util.Objects.requireNonNull;

final class S3InputFile
Expand All @@ -39,17 +43,19 @@ final class S3InputFile
private final S3Location location;
private final S3Context context;
private final RequestPayer requestPayer;
private final Optional<EncryptionKey> key;
private Long length;
private Instant lastModified;

public S3InputFile(S3Client client, S3Context context, S3Location location, Long length, Instant lastModified)
public S3InputFile(S3Client client, S3Context context, S3Location location, Long length, Instant lastModified, Optional<EncryptionKey> key)
{
this.client = requireNonNull(client, "client is null");
this.location = requireNonNull(location, "location is null");
this.context = requireNonNull(context, "context is null");
this.requestPayer = context.requestPayer();
this.length = length;
this.lastModified = lastModified;
this.key = requireNonNull(key, "key is null");
location.location().verifyValidFileLocation();
}

Expand Down Expand Up @@ -105,6 +111,11 @@ private GetObjectRequest newGetObjectRequest()
.requestPayer(requestPayer)
.bucket(location.bucket())
.key(location.key())
.applyMutation(builder -> key.ifPresent(encryption -> {
builder.sseCustomerKey(encoded(encryption));
builder.sseCustomerAlgorithm(encryption.algorithm());
builder.sseCustomerKeyMD5(md5Checksum(encryption));
}))
.build();
}

Expand All @@ -116,6 +127,11 @@ private boolean headObject()
.requestPayer(requestPayer)
.bucket(location.bucket())
.key(location.key())
.applyMutation(builder -> key.ifPresent(encryption -> {
builder.sseCustomerKey(encoded(encryption));
builder.sseCustomerAlgorithm(encryption.algorithm());
builder.sseCustomerKeyMD5(md5Checksum(encryption));
}))
.build();

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@

import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoOutputFile;
import io.trino.filesystem.encryption.EncryptionKey;
import io.trino.memory.context.AggregatedMemoryContext;
import software.amazon.awssdk.services.s3.S3Client;

import java.io.IOException;
import java.io.OutputStream;
import java.util.Optional;
import java.util.concurrent.Executor;

import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext;
Expand All @@ -32,13 +34,15 @@ final class S3OutputFile
private final S3Client client;
private final S3Context context;
private final S3Location location;
private final Optional<EncryptionKey> key;

public S3OutputFile(Executor uploadExecutor, S3Client client, S3Context context, S3Location location)
public S3OutputFile(Executor uploadExecutor, S3Client client, S3Context context, S3Location location, Optional<EncryptionKey> key)
{
this.uploadExecutor = requireNonNull(uploadExecutor, "uploadExecutor is null");
this.client = requireNonNull(client, "client is null");
this.context = requireNonNull(context, "context is null");
this.location = requireNonNull(location, "location is null");
this.key = requireNonNull(key, "key is null");
location.location().verifyValidFileLocation();
}

Expand Down Expand Up @@ -72,7 +76,7 @@ public OutputStream create(AggregatedMemoryContext memoryContext)

public OutputStream create(AggregatedMemoryContext memoryContext, boolean exclusive)
{
return new S3OutputStream(memoryContext, uploadExecutor, client, context, location, exclusive);
return new S3OutputStream(memoryContext, uploadExecutor, client, context, location, exclusive, key);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.filesystem.s3;

import io.trino.filesystem.encryption.EncryptionKey;
import io.trino.filesystem.s3.S3FileSystemConfig.S3SseType;
import io.trino.memory.context.AggregatedMemoryContext;
import io.trino.memory.context.LocalMemoryContext;
Expand Down Expand Up @@ -43,7 +44,10 @@
import java.util.concurrent.Executor;
import java.util.concurrent.Future;

import static com.google.common.base.Verify.verify;
import static io.trino.filesystem.s3.S3FileSystemConfig.ObjectCannedAcl.getCannedAcl;
import static io.trino.filesystem.s3.S3SseCUtils.encoded;
import static io.trino.filesystem.s3.S3SseCUtils.md5Checksum;
import static java.lang.Math.clamp;
import static java.lang.Math.max;
import static java.lang.Math.min;
Expand All @@ -69,6 +73,7 @@ final class S3OutputStream
private final String sseKmsKeyId;
private final ObjectCannedACL cannedAcl;
private final boolean exclusiveCreate;
private final Optional<EncryptionKey> key;

private int currentPartNumber;
private byte[] buffer = new byte[0];
Expand All @@ -85,7 +90,7 @@ final class S3OutputStream
// Visibility is ensured by calling get() on inProgressUploadFuture.
private Optional<String> uploadId = Optional.empty();

public S3OutputStream(AggregatedMemoryContext memoryContext, Executor uploadExecutor, S3Client client, S3Context context, S3Location location, boolean exclusiveCreate)
public S3OutputStream(AggregatedMemoryContext memoryContext, Executor uploadExecutor, S3Client client, S3Context context, S3Location location, boolean exclusiveCreate, Optional<EncryptionKey> key)
{
this.memoryContext = memoryContext.newLocalMemoryContext(S3OutputStream.class.getSimpleName());
this.uploadExecutor = requireNonNull(uploadExecutor, "uploadExecutor is null");
Expand All @@ -98,6 +103,9 @@ public S3OutputStream(AggregatedMemoryContext memoryContext, Executor uploadExec
this.sseType = context.sseType();
this.sseKmsKeyId = context.sseKmsKeyId();
this.cannedAcl = getCannedAcl(context.cannedAcl());
this.key = requireNonNull(key, "key is null");

verify(key.isEmpty() || sseType == S3SseType.NONE, "Encryption key cannot be used with sse configuration");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the only one place when you touch sseType config variable
As I understand, this line states that if we provide key the sseType needs to be set to S3SseType.NONE to allow further processing? is this correct ?
imho, judging by the rest of the code in this PR, the PR seems to be irrelevant to the settings of sseType config variable at all

}

@SuppressWarnings("NumericCastThatLosesPrecision")
Expand Down Expand Up @@ -216,6 +224,11 @@ private void flushBuffer(boolean finished)
if (exclusiveCreate) {
builder.ifNoneMatch("*");
}
key.ifPresent(encryption -> {
builder.sseCustomerKey(encoded(encryption));
builder.sseCustomerAlgorithm(encryption.algorithm());
builder.sseCustomerKeyMD5(md5Checksum(encryption));
});
switch (sseType) {
case NONE -> { /* ignored */ }
case S3 -> builder.serverSideEncryption(AES256);
Expand Down Expand Up @@ -301,6 +314,11 @@ private CompletedPart uploadPage(byte[] data, int length)
.bucket(location.bucket())
.key(location.key())
.applyMutation(builder -> {
key.ifPresent(encryption -> {
builder.sseCustomerKey(encoded(encryption));
builder.sseCustomerAlgorithm(encryption.algorithm());
builder.sseCustomerKeyMD5(md5Checksum(encryption));
});
switch (sseType) {
case NONE -> { /* ignored */ }
case S3 -> builder.serverSideEncryption(AES256);
Expand All @@ -321,6 +339,11 @@ private CompletedPart uploadPage(byte[] data, int length)
.contentLength((long) length)
.uploadId(uploadId.get())
.partNumber(currentPartNumber)
.applyMutation(builder -> key.ifPresent(encryption -> {
builder.sseCustomerKey(encoded(encryption));
builder.sseCustomerAlgorithm(encryption.algorithm());
builder.sseCustomerKeyMD5(md5Checksum(encryption));
}))
.build();

ByteBuffer bytes = ByteBuffer.wrap(data, 0, length);
Expand All @@ -346,6 +369,11 @@ private void finishUpload(String uploadId)
.uploadId(uploadId)
.multipartUpload(x -> x.parts(parts))
.applyMutation(builder -> {
key.ifPresent(encodingKey -> {
builder.sseCustomerKey(encoded(encodingKey));
builder.sseCustomerAlgorithm(encodingKey.algorithm());
builder.sseCustomerKeyMD5(md5Checksum(encodingKey));
});
if (exclusiveCreate) {
builder.ifNoneMatch("*");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.filesystem.s3;

import io.trino.filesystem.encryption.EncryptionKey;

import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Base64;

final class S3SseCUtils
Comment thread
wendigo marked this conversation as resolved.
{
private S3SseCUtils() {}

public static String encoded(EncryptionKey key)
{
return Base64.getEncoder().encodeToString(key.key());
}

public static String md5Checksum(EncryptionKey key)
{
try {
MessageDigest digest = MessageDigest.getInstance("MD5");
Comment thread
wendigo marked this conversation as resolved.
digest.update(key.key());
return Base64.getEncoder().encodeToString(digest.digest());
}
catch (NoSuchAlgorithmException e) {
throw new RuntimeException(e);
}
}
}
Loading