Skip to content

Commit

Permalink
Add support for S3 conditional writes
Browse files Browse the repository at this point in the history
This allows implementing exclusive write that will ensure that
the next putObject won't override existing file.
  • Loading branch information
wendigo committed Aug 28, 2024
1 parent dc9d7c5 commit 1a8551c
Show file tree
Hide file tree
Showing 18 changed files with 101 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -194,12 +194,6 @@ protected final void verifyFileSystemIsEmpty()
assertThat(blobContainerClient.listBlobs()).map(BlobItem::getName).isEmpty();
}

@Override
protected boolean supportsCreateExclusive()
{
return true;
}

@Test
@Override
public void testPaths()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,6 @@ protected boolean isHierarchical()
return false;
}

@Override
protected boolean supportsCreateExclusive()
{
return true;
}

@Override
protected TrinoFileSystem getFileSystem()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,6 @@ protected void verifyFileSystemIsEmpty()
assertThat(storage.list(bucket).iterateAll()).isEmpty();
}

@Override
protected final boolean supportsCreateExclusive()
{
return true;
}

@Override
protected final boolean supportsRenameFile()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import static io.trino.filesystem.s3.S3FileSystemConstants.EXTRA_CREDENTIALS_SESSION_TOKEN_PROPERTY;
import static java.util.Objects.requireNonNull;

record S3Context(int partSize, boolean requesterPays, S3SseType sseType, String sseKmsKeyId, Optional<AwsCredentialsProvider> credentialsProviderOverride, ObjectCannedAcl cannedAcl)
record S3Context(int partSize, boolean requesterPays, S3SseType sseType, String sseKmsKeyId, Optional<AwsCredentialsProvider> credentialsProviderOverride, ObjectCannedAcl cannedAcl, boolean exclusiveWriteSupported)
{
private static final int MIN_PART_SIZE = 5 * 1024 * 1024; // S3 requirement

Expand All @@ -49,7 +49,7 @@ public RequestPayer requestPayer()

public S3Context withKmsKeyId(String kmsKeyId)
{
return new S3Context(partSize, requesterPays, sseType, kmsKeyId, credentialsProviderOverride, cannedAcl);
return new S3Context(partSize, requesterPays, sseType, kmsKeyId, credentialsProviderOverride, cannedAcl, exclusiveWriteSupported);
}

public S3Context withCredentials(ConnectorIdentity identity)
Expand All @@ -72,7 +72,8 @@ public S3Context withCredentialsProviderOverride(AwsCredentialsProvider credenti
sseType,
sseKmsKeyId,
Optional.of(credentialsProviderOverride),
cannedAcl);
cannedAcl,
exclusiveWriteSupported);
}

public void applyCredentialProviderOverride(AwsRequestOverrideConfiguration.Builder builder)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public static RetryStrategy getRetryStrategy(RetryMode retryMode)
private ObjectCannedAcl objectCannedAcl = ObjectCannedAcl.NONE;
private RetryMode retryMode = RetryMode.LEGACY;
private int maxErrorRetries = 10;
private boolean supportsExclusiveCreate = true;

public String getAwsAccessKey()
{
Expand Down Expand Up @@ -497,4 +498,17 @@ public S3FileSystemConfig setNonProxyHosts(String nonProxyHosts)
this.nonProxyHosts = ImmutableSet.copyOf(Splitter.on(',').omitEmptyStrings().trimResults().split(nullToEmpty(nonProxyHosts)));
return this;
}

public boolean isSupportsExclusiveCreate()
{
return supportsExclusiveCreate;
}

@Config("s3.exclusive-create")
@ConfigDescription("Whether S3-compatible storage supports exclusive create (true for Minio and AWS S3)")
public S3FileSystemConfig setSupportsExclusiveCreate(boolean supportsExclusiveCreate)
{
this.supportsExclusiveCreate = supportsExclusiveCreate;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ private S3FileSystemLoader(Optional<S3SecurityMappingProvider> mappingProvider,
config.getSseType(),
config.getSseKmsKeyId(),
Optional.empty(),
config.getCannedAcl());
config.getCannedAcl(),
config.isSupportsExclusiveCreate());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.OutputStream;
import java.util.concurrent.Executor;

import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext;
import static java.util.Objects.requireNonNull;

final class S3OutputFile
Expand All @@ -45,15 +46,33 @@ public S3OutputFile(Executor uploadExecutor, S3Client client, S3Context context,
public void createOrOverwrite(byte[] data)
throws IOException
{
try (OutputStream out = create()) {
try (OutputStream out = create(newSimpleAggregatedMemoryContext(), false)) {
out.write(data);
}
}

@Override
public void createExclusive(byte[] data)
throws IOException
{
if (!context.exclusiveWriteSupported()) {
throw new UnsupportedOperationException("createExclusive not supported by " + getClass());
}

try (OutputStream out = create(newSimpleAggregatedMemoryContext(), true)) {
out.write(data);
}
}

@Override
public OutputStream create(AggregatedMemoryContext memoryContext)
{
return new S3OutputStream(memoryContext, uploadExecutor, client, context, location);
return create(memoryContext, context.exclusiveWriteSupported());
}

public OutputStream create(AggregatedMemoryContext memoryContext, boolean exclusive)
{
return new S3OutputStream(memoryContext, uploadExecutor, client, context, location, exclusive);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@
import software.amazon.awssdk.services.s3.model.ObjectCannedACL;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.RequestPayer;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
import software.amazon.awssdk.services.s3.model.UploadPartResponse;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.file.FileAlreadyExistsException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand All @@ -46,6 +48,7 @@
import static java.lang.Math.max;
import static java.lang.Math.min;
import static java.lang.System.arraycopy;
import static java.net.HttpURLConnection.HTTP_PRECON_FAILED;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.CompletableFuture.supplyAsync;
import static software.amazon.awssdk.services.s3.model.ServerSideEncryption.AES256;
Expand All @@ -65,6 +68,7 @@ final class S3OutputStream
private final S3SseType sseType;
private final String sseKmsKeyId;
private final ObjectCannedACL cannedAcl;
private final boolean exclusiveCreate;

private int currentPartNumber;
private byte[] buffer = new byte[0];
Expand All @@ -81,12 +85,13 @@ final class S3OutputStream
// Visibility is ensured by calling get() on inProgressUploadFuture.
private Optional<String> uploadId = Optional.empty();

public S3OutputStream(AggregatedMemoryContext memoryContext, Executor uploadExecutor, S3Client client, S3Context context, S3Location location)
public S3OutputStream(AggregatedMemoryContext memoryContext, Executor uploadExecutor, S3Client client, S3Context context, S3Location location, boolean exclusiveCreate)
{
this.memoryContext = memoryContext.newLocalMemoryContext(S3OutputStream.class.getSimpleName());
this.uploadExecutor = requireNonNull(uploadExecutor, "uploadExecutor is null");
this.client = requireNonNull(client, "client is null");
this.location = requireNonNull(location, "location is null");
this.exclusiveCreate = exclusiveCreate;
this.context = requireNonNull(context, "context is null");
this.partSize = context.partSize();
this.requestPayer = context.requestPayer();
Expand Down Expand Up @@ -208,6 +213,9 @@ private void flushBuffer(boolean finished)
.key(location.key())
.contentLength((long) bufferSize)
.applyMutation(builder -> {
if (exclusiveCreate) {
builder.ifNoneMatch("*");
}
switch (sseType) {
case NONE -> { /* ignored */ }
case S3 -> builder.serverSideEncryption(AES256);
Expand All @@ -222,6 +230,14 @@ private void flushBuffer(boolean finished)
client.putObject(request, RequestBody.fromByteBuffer(bytes));
return;
}
catch (S3Exception e) {
failed = true;
// when `location` already exists, the operation will fail with `412 Precondition Failed`
if (e.statusCode() == HTTP_PRECON_FAILED) {
throw new FileAlreadyExistsException(location.toString());
}
throw new IOException("Put failed for bucket [%s] key [%s]: %s".formatted(location.bucket(), location.key(), e), e);
}
catch (SdkException e) {
failed = true;
throw new IOException("Put failed for bucket [%s] key [%s]: %s".formatted(location.bucket(), location.key(), e), e);
Expand Down Expand Up @@ -329,6 +345,11 @@ private void finishUpload(String uploadId)
.key(location.key())
.uploadId(uploadId)
.multipartUpload(x -> x.parts(parts))
.applyMutation(builder -> {
if (exclusiveCreate) {
builder.ifNoneMatch("*");
}
})
.build();

client.completeMultipartUpload(request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,6 @@ protected final Location getRootLocation()
return Location.of("s3://%s/".formatted(bucket()));
}

@Override
protected boolean isCreateExclusive()
{
return false;
}

@Override
protected final boolean supportsRenameFile()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ protected S3FileSystemFactory createS3FileSystemFactory()
.setAwsAccessKey(accessKey)
.setAwsSecretKey(secretKey)
.setRegion(region)
.setSupportsExclusiveCreate(true)
.setStreamingPartSize(DataSize.valueOf("5.5MB")));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ public void testDefaults()
.setNonProxyHosts(null)
.setHttpProxyUsername(null)
.setHttpProxyPassword(null)
.setHttpProxyPreemptiveBasicProxyAuth(false));
.setHttpProxyPreemptiveBasicProxyAuth(false)
.setSupportsExclusiveCreate(true));
}

@Test
Expand Down Expand Up @@ -103,6 +104,7 @@ public void testExplicitPropertyMappings()
.put("s3.http-proxy.username", "test")
.put("s3.http-proxy.password", "test")
.put("s3.http-proxy.preemptive-basic-auth", "true")
.put("s3.exclusive-create", "false")
.buildOrThrow();

S3FileSystemConfig expected = new S3FileSystemConfig()
Expand Down Expand Up @@ -135,7 +137,8 @@ public void testExplicitPropertyMappings()
.setNonProxyHosts("test1, test2, test3")
.setHttpProxyUsername("test")
.setHttpProxyPassword("test")
.setHttpProxyPreemptiveBasicProxyAuth(true);
.setHttpProxyPreemptiveBasicProxyAuth(true)
.setSupportsExclusiveCreate(false);

assertFullMapping(properties, expected);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,18 @@ protected void initEnvironment()
}
}

@Override
protected boolean isCreateExclusive()
{
return false; // not supported by localstack
}

@Override
protected boolean supportsCreateExclusive()
{
return false; // not supported by localstack
}

@Override
protected String bucket()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ protected S3FileSystemFactory createS3FileSystemFactory()
.setPathStyleAccess(true)
.setAwsAccessKey(Minio.MINIO_ACCESS_KEY)
.setAwsSecretKey(Minio.MINIO_SECRET_KEY)
.setSupportsExclusiveCreate(true)
.setStreamingPartSize(DataSize.valueOf("5.5MB")));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,18 @@ public class TestS3FileSystemS3Mock
private static final S3MockContainer S3_MOCK = new S3MockContainer("3.0.1")
.withInitialBuckets(BUCKET);

@Override
protected boolean isCreateExclusive()
{
return false; // not supported by s3-mock
}

@Override
protected boolean supportsCreateExclusive()
{
return false; // not supported by s3-mock
}

@Override
protected String bucket()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ protected boolean isCreateExclusive()
*/
protected boolean supportsCreateExclusive()
{
return false;
return true;
}

protected boolean supportsRenameFile()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ void beforeAll()
fileSystem = new LocalFileSystem(tempDirectory);
}

@Override
protected boolean supportsCreateExclusive()
{
return false;
}

@AfterEach
void afterEach()
throws IOException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,6 @@ protected boolean isHierarchical()
return false;
}

@Override
protected boolean supportsCreateExclusive()
{
return true;
}

@Override
protected TrinoFileSystem getFileSystem()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,6 @@ protected boolean isHierarchical()
return false;
}

@Override
protected boolean supportsCreateExclusive()
{
return true;
}

@Override
protected TrinoFileSystem getFileSystem()
{
Expand Down

0 comments on commit 1a8551c

Please sign in to comment.