diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/s3/ExchangeS3Config.java b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/s3/ExchangeS3Config.java index 70717d67b084..085655eb767a 100644 --- a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/s3/ExchangeS3Config.java +++ b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/s3/ExchangeS3Config.java @@ -17,6 +17,7 @@ import io.airlift.configuration.ConfigDescription; import io.airlift.configuration.ConfigSecuritySensitive; import io.airlift.units.DataSize; +import io.airlift.units.Duration; import io.airlift.units.MaxDataSize; import io.airlift.units.MinDataSize; import software.amazon.awssdk.core.retry.RetryMode; @@ -30,6 +31,7 @@ import static io.airlift.units.DataSize.Unit.MEGABYTE; import static java.util.Locale.ENGLISH; +import static java.util.concurrent.TimeUnit.MINUTES; public class ExchangeS3Config { @@ -43,7 +45,9 @@ public class ExchangeS3Config private DataSize s3UploadPartSize = DataSize.of(5, MEGABYTE); private StorageClass storageClass = StorageClass.STANDARD; private RetryMode retryMode = RetryMode.ADAPTIVE; - private int asyncClientConcurrency = 500; + private int asyncClientConcurrency = 100; + private int asyncClientMaxPendingConnectionAcquires = 10000; + private Duration connectionAcquisitionTimeout = new Duration(1, MINUTES); public String getS3AwsAccessKey() { @@ -176,4 +180,29 @@ public ExchangeS3Config setAsyncClientConcurrency(int asyncClientConcurrency) this.asyncClientConcurrency = asyncClientConcurrency; return this; } + + @Min(1) + public int getAsyncClientMaxPendingConnectionAcquires() + { + return asyncClientMaxPendingConnectionAcquires; + } + + @Config("exchange.s3.async-client-max-pending-connection-acquires") + public ExchangeS3Config setAsyncClientMaxPendingConnectionAcquires(int asyncClientMaxPendingConnectionAcquires) + { + this.asyncClientMaxPendingConnectionAcquires = asyncClientMaxPendingConnectionAcquires; + return this; + } + + public Duration getConnectionAcquisitionTimeout() + { + return connectionAcquisitionTimeout; + } + + @Config("exchange.s3.async-client-connection-acquisition-timeout") + public ExchangeS3Config setConnectionAcquisitionTimeout(Duration connectionAcquisitionTimeout) + { + this.connectionAcquisitionTimeout = connectionAcquisitionTimeout; + return this; + } } diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/s3/S3FileSystemExchangeStorage.java b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/s3/S3FileSystemExchangeStorage.java index e66e2c55a7b9..b869bb3dd356 100644 --- a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/s3/S3FileSystemExchangeStorage.java +++ b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/s3/S3FileSystemExchangeStorage.java @@ -22,6 +22,7 @@ import io.airlift.slice.Slice; import io.airlift.slice.SliceInput; import io.airlift.slice.Slices; +import io.airlift.units.Duration; import io.trino.plugin.exchange.ExchangeSourceFile; import io.trino.plugin.exchange.ExchangeStorageReader; import io.trino.plugin.exchange.ExchangeStorageWriter; @@ -135,7 +136,12 @@ public S3FileSystemExchangeStorage(ExchangeS3Config config) .build(); this.s3Client = createS3Client(credentialsProvider, overrideConfig); - this.s3AsyncClient = createS3AsyncClient(credentialsProvider, overrideConfig, config.getAsyncClientConcurrency()); + this.s3AsyncClient = createS3AsyncClient( + credentialsProvider, + overrideConfig, + config.getAsyncClientConcurrency(), + config.getAsyncClientMaxPendingConnectionAcquires(), + config.getConnectionAcquisitionTimeout()); } @Override @@ -387,13 +393,20 @@ private S3Client createS3Client(AwsCredentialsProvider credentialsProvider, Clie return clientBuilder.build(); } - private S3AsyncClient createS3AsyncClient(AwsCredentialsProvider credentialsProvider, ClientOverrideConfiguration overrideConfig, int maxConcurrency) + private S3AsyncClient createS3AsyncClient( + AwsCredentialsProvider credentialsProvider, + ClientOverrideConfiguration overrideConfig, + int maxConcurrency, + int maxPendingConnectionAcquires, + Duration connectionAcquisitionTimeout) { S3AsyncClientBuilder clientBuilder = S3AsyncClient.builder() .credentialsProvider(credentialsProvider) .overrideConfiguration(overrideConfig) .httpClientBuilder(NettyNioAsyncHttpClient.builder() - .maxConcurrency(maxConcurrency)); + .maxConcurrency(maxConcurrency) + .maxPendingConnectionAcquires(maxPendingConnectionAcquires) + .connectionAcquisitionTimeout(java.time.Duration.ofMillis(connectionAcquisitionTimeout.toMillis()))); region.ifPresent(clientBuilder::region); endpoint.ifPresent(s3Endpoint -> clientBuilder.endpointOverride(URI.create(s3Endpoint))); diff --git a/plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/s3/TestExchangeS3Config.java b/plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/s3/TestExchangeS3Config.java index 2a8bc1a7ffcf..7a21c26c0522 100644 --- a/plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/s3/TestExchangeS3Config.java +++ b/plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/s3/TestExchangeS3Config.java @@ -15,6 +15,7 @@ import com.google.common.collect.ImmutableMap; import io.airlift.units.DataSize; +import io.airlift.units.Duration; import org.testng.annotations.Test; import software.amazon.awssdk.core.retry.RetryMode; import software.amazon.awssdk.services.s3.model.StorageClass; @@ -25,6 +26,7 @@ import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults; import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults; import static io.airlift.units.DataSize.Unit.MEGABYTE; +import static java.util.concurrent.TimeUnit.MINUTES; public class TestExchangeS3Config { @@ -41,7 +43,9 @@ public void testDefaults() .setS3UploadPartSize(DataSize.of(5, MEGABYTE)) .setStorageClass(StorageClass.STANDARD) .setRetryMode(RetryMode.ADAPTIVE) - .setAsyncClientConcurrency(500)); + .setAsyncClientConcurrency(100) + .setAsyncClientMaxPendingConnectionAcquires(10000) + .setConnectionAcquisitionTimeout(new Duration(1, MINUTES))); } @Test @@ -58,6 +62,8 @@ public void testExplicitPropertyMappings() .put("exchange.s3.storage-class", "REDUCED_REDUNDANCY") .put("exchange.s3.retry-mode", "STANDARD") .put("exchange.s3.async-client-concurrency", "202") + .put("exchange.s3.async-client-max-pending-connection-acquires", "999") + .put("exchange.s3.async-client-connection-acquisition-timeout", "5m") .buildOrThrow(); ExchangeS3Config expected = new ExchangeS3Config() @@ -70,7 +76,9 @@ public void testExplicitPropertyMappings() .setS3UploadPartSize(DataSize.of(10, MEGABYTE)) .setStorageClass(StorageClass.REDUCED_REDUNDANCY) .setRetryMode(RetryMode.STANDARD) - .setAsyncClientConcurrency(202); + .setAsyncClientConcurrency(202) + .setAsyncClientMaxPendingConnectionAcquires(999) + .setConnectionAcquisitionTimeout(new Duration(5, MINUTES)); assertFullMapping(properties, expected); }