Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions lib/trino-filesystem-s3/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@
<artifactId>guice</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>concurrent</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>configuration</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.stream.Stream;

import static com.google.common.collect.ImmutableSet.toImmutableSet;
Expand All @@ -51,12 +52,14 @@
final class S3FileSystem
implements TrinoFileSystem
{
private final ExecutorService uploadExecutor;
private final S3Client client;
private final S3Context context;
private final RequestPayer requestPayer;

public S3FileSystem(S3Client client, S3Context context)
public S3FileSystem(ExecutorService uploadExecutor, S3Client client, S3Context context)
{
this.uploadExecutor = requireNonNull(uploadExecutor, "uploadExecutor is null");
this.client = requireNonNull(client, "client is null");
this.context = requireNonNull(context, "context is null");
this.requestPayer = context.requestPayer();
Expand All @@ -77,7 +80,7 @@ public TrinoInputFile newInputFile(Location location, long length)
@Override
public TrinoOutputFile newOutputFile(Location location)
{
return new S3OutputFile(client, context, new S3Location(location));
return new S3OutputFile(uploadExecutor, client, context, new S3Location(location));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public static software.amazon.awssdk.core.retry.RetryMode getRetryMode(RetryMode
private String sseKmsKeyId;
private DataSize streamingPartSize = DataSize.of(16, MEGABYTE);
private boolean requesterPays;
private Integer maxConnections;
private Integer maxConnections = 500;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's have a comment why this number. (i guess the reason is more than just "it's what we used to have in the legacy", otherwise we wouldn't make this change -- there are other things we used to have and we wanted not to carry on)

private Duration connectionTtl;
private Duration connectionMaxIdleTime;
private Duration socketConnectTimeout;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,22 @@

import java.net.URI;
import java.util.Optional;
import java.util.concurrent.ExecutorService;

import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static io.trino.filesystem.s3.S3FileSystemConfig.RetryMode.getRetryMode;
import static io.trino.filesystem.s3.S3FileSystemConstants.EXTRA_CREDENTIALS_ACCESS_KEY_PROPERTY;
import static io.trino.filesystem.s3.S3FileSystemConstants.EXTRA_CREDENTIALS_SECRET_KEY_PROPERTY;
import static io.trino.filesystem.s3.S3FileSystemConstants.EXTRA_CREDENTIALS_SESSION_TOKEN_PROPERTY;
import static java.lang.Math.toIntExact;
import static java.util.concurrent.Executors.newCachedThreadPool;

public final class S3FileSystemFactory
implements TrinoFileSystemFactory
{
private final S3Client client;
private final S3Context context;
private final ExecutorService uploadExecutor;

@Inject
public S3FileSystemFactory(OpenTelemetry openTelemetry, S3FileSystemConfig config)
Expand Down Expand Up @@ -120,12 +124,15 @@ public S3FileSystemFactory(OpenTelemetry openTelemetry, S3FileSystemConfig confi
config.getSseKmsKeyId(),
Optional.empty(),
config.getCannedAcl());

this.uploadExecutor = newCachedThreadPool(daemonThreadsNamed("s3-upload-%s"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since there are no constructor provided args, you can inline this into the field declaration.

}

@PreDestroy
public void destroy()
{
client.close();
uploadExecutor.shutdownNow();
}

@Override
Expand All @@ -136,10 +143,10 @@ public TrinoFileSystem create(ConnectorIdentity identity)
identity.getExtraCredentials().get(EXTRA_CREDENTIALS_ACCESS_KEY_PROPERTY),
identity.getExtraCredentials().get(EXTRA_CREDENTIALS_SECRET_KEY_PROPERTY),
identity.getExtraCredentials().get(EXTRA_CREDENTIALS_SESSION_TOKEN_PROPERTY)));
return new S3FileSystem(client, context.withCredentialsProviderOverride(credentialsProvider));
return new S3FileSystem(uploadExecutor, client, context.withCredentialsProviderOverride(credentialsProvider));
}

return new S3FileSystem(client, context);
return new S3FileSystem(uploadExecutor, client, context);
}

private static Optional<StaticCredentialsProvider> getStaticCredentialsProvider(S3FileSystemConfig config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,21 @@

import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.ExecutorService;

import static java.util.Objects.requireNonNull;

final class S3OutputFile
implements TrinoOutputFile
{
private final ExecutorService uploadExecutor;
private final S3Client client;
private final S3Context context;
private final S3Location location;

public S3OutputFile(S3Client client, S3Context context, S3Location location)
public S3OutputFile(ExecutorService uploadExecutor, S3Client client, S3Context context, S3Location location)
{
this.uploadExecutor = requireNonNull(uploadExecutor, "uploadExecutor is null");
this.client = requireNonNull(client, "client is null");
this.context = requireNonNull(context, "context is null");
this.location = requireNonNull(location, "location is null");
Expand All @@ -50,7 +53,7 @@ public void createOrOverwrite(byte[] data)
@Override
public OutputStream create(AggregatedMemoryContext memoryContext)
{
return new S3OutputStream(memoryContext, client, context, location);
return new S3OutputStream(memoryContext, uploadExecutor, client, context, location);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

import static io.trino.filesystem.s3.S3FileSystemConfig.ObjectCannedAcl.getCannedAcl;
Expand All @@ -55,6 +56,7 @@ final class S3OutputStream
{
private final List<CompletedPart> parts = new ArrayList<>();
private final LocalMemoryContext memoryContext;
private final ExecutorService uploadExecutor;
private final S3Client client;
private final S3Location location;
private final S3Context context;
Expand All @@ -79,9 +81,10 @@ final class S3OutputStream
// Visibility is ensured by calling get() on inProgressUploadFuture.
private Optional<String> uploadId = Optional.empty();

public S3OutputStream(AggregatedMemoryContext memoryContext, S3Client client, S3Context context, S3Location location)
public S3OutputStream(AggregatedMemoryContext memoryContext, ExecutorService uploadExecutor, S3Client client, S3Context context, S3Location location)
{
this.memoryContext = memoryContext.newLocalMemoryContext(S3OutputStream.class.getSimpleName());
this.uploadExecutor = requireNonNull(uploadExecutor, "uploadExecutor is null");
this.client = requireNonNull(client, "client is null");
this.location = requireNonNull(location, "location is null");
this.context = requireNonNull(context, "context is null");
Expand Down Expand Up @@ -249,7 +252,7 @@ private void flushBuffer(boolean finished)
throw e;
}
multipartUploadStarted = true;
inProgressUploadFuture = supplyAsync(() -> uploadPage(data, length));
inProgressUploadFuture = supplyAsync(() -> uploadPage(data, length), uploadExecutor);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public void testDefaults()
.setSseKmsKeyId(null)
.setStreamingPartSize(DataSize.of(16, MEGABYTE))
.setRequesterPays(false)
.setMaxConnections(null)
.setMaxConnections(500)
.setConnectionTtl(null)
.setConnectionMaxIdleTime(null)
.setSocketConnectTimeout(null)
Expand Down