diff --git a/docs/src/main/sphinx/admin/fault-tolerant-execution.rst b/docs/src/main/sphinx/admin/fault-tolerant-execution.rst index bd41f4c47e91..6e5393f2560e 100644 --- a/docs/src/main/sphinx/admin/fault-tolerant-execution.rst +++ b/docs/src/main/sphinx/admin/fault-tolerant-execution.rst @@ -410,7 +410,7 @@ the property may be configured for: * - ``exchange.s3.max-error-retries`` - Maximum number of times the exchange manager's S3 client should retry a request. - - ``3`` + - ``10`` - Any S3-compatible storage * - ``exchange.s3.upload.part-size`` - Part size for S3 multi-part upload. @@ -435,6 +435,11 @@ the property may be configured for: - Block size for Azure block blob parallel upload. - ``4MB`` - Azure Blob Storage + * - ``exchange.azure.max-error-retries`` + - Maximum number of times the exchange manager's Azure client should + retry a request. + - ``10`` + - Azure Blob Storage It is recommended to set the ``exchange.compression-enabled`` property to ``true`` in the cluster's ``config.properties`` file, to reduce the exchange diff --git a/plugin/trino-exchange-filesystem/pom.xml b/plugin/trino-exchange-filesystem/pom.xml index cced62ba69af..fd78721a8bc2 100644 --- a/plugin/trino-exchange-filesystem/pom.xml +++ b/plugin/trino-exchange-filesystem/pom.xml @@ -111,6 +111,11 @@ azure-storage-blob-batch + + com.azure + azure-storage-common + + com.google.api gax diff --git a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/azure/AzureBlobFileSystemExchangeStorage.java b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/azure/AzureBlobFileSystemExchangeStorage.java index 8390e9f80dae..d6952eb3acf2 100644 --- a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/azure/AzureBlobFileSystemExchangeStorage.java +++ b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/azure/AzureBlobFileSystemExchangeStorage.java @@ -27,6 +27,8 @@ import com.azure.storage.blob.models.DeleteSnapshotsOptionType; import com.azure.storage.blob.models.ListBlobsOptions; import com.azure.storage.blob.specialized.BlockBlobAsyncClient; +import com.azure.storage.common.policy.RequestRetryOptions; +import com.azure.storage.common.policy.RetryPolicyType; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMultimap; import com.google.common.collect.Lists; @@ -96,9 +98,10 @@ public AzureBlobFileSystemExchangeStorage(ExchangeAzureConfig config) { requireNonNull(config, "config is null"); this.blockSize = toIntExact(config.getAzureStorageBlockSize().toBytes()); - Optional connectionString = config.getAzureStorageConnectionString(); - BlobServiceClientBuilder blobServiceClientBuilder = new BlobServiceClientBuilder(); + BlobServiceClientBuilder blobServiceClientBuilder = new BlobServiceClientBuilder() + .retryOptions(new RequestRetryOptions(RetryPolicyType.EXPONENTIAL, config.getMaxErrorRetries(), (Integer) null, null, null, null)); + Optional connectionString = config.getAzureStorageConnectionString(); if (connectionString.isPresent()) { blobServiceClientBuilder.connectionString(connectionString.get()); } diff --git a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/azure/ExchangeAzureConfig.java b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/azure/ExchangeAzureConfig.java index a51334f5cc49..1117ef34f404 100644 --- a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/azure/ExchangeAzureConfig.java +++ b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/azure/ExchangeAzureConfig.java @@ -20,6 +20,7 @@ import io.airlift.units.MaxDataSize; import io.airlift.units.MinDataSize; +import javax.validation.constraints.Min; import javax.validation.constraints.NotNull; import java.util.Optional; @@ -30,6 +31,7 @@ public class ExchangeAzureConfig { private Optional azureStorageConnectionString = Optional.empty(); private DataSize azureStorageBlockSize = DataSize.of(4, MEGABYTE); + private int maxErrorRetries = 10; public Optional getAzureStorageConnectionString() { @@ -59,4 +61,17 @@ public ExchangeAzureConfig setAzureStorageBlockSize(DataSize azureStorageBlockSi this.azureStorageBlockSize = azureStorageBlockSize; return this; } + + @Min(0) + public int getMaxErrorRetries() + { + return maxErrorRetries; + } + + @Config("exchange.azure.max-error-retries") + public ExchangeAzureConfig setMaxErrorRetries(int maxErrorRetries) + { + this.maxErrorRetries = maxErrorRetries; + return this; + } } diff --git a/plugin/trino-exchange-filesystem/src/test/java/io/trino/plugin/exchange/filesystem/azure/TestExchangeAzureConfig.java b/plugin/trino-exchange-filesystem/src/test/java/io/trino/plugin/exchange/filesystem/azure/TestExchangeAzureConfig.java index 0a212002aa38..7717e149b63e 100644 --- a/plugin/trino-exchange-filesystem/src/test/java/io/trino/plugin/exchange/filesystem/azure/TestExchangeAzureConfig.java +++ b/plugin/trino-exchange-filesystem/src/test/java/io/trino/plugin/exchange/filesystem/azure/TestExchangeAzureConfig.java @@ -31,7 +31,8 @@ public void testDefaults() { assertRecordedDefaults(recordDefaults(ExchangeAzureConfig.class) .setAzureStorageConnectionString(null) - .setAzureStorageBlockSize(DataSize.of(4, MEGABYTE))); + .setAzureStorageBlockSize(DataSize.of(4, MEGABYTE)) + .setMaxErrorRetries(10)); } @Test @@ -40,11 +41,13 @@ public void testExplicitPropertyMappings() Map properties = ImmutableMap.builder() .put("exchange.azure.connection-string", "connection") .put("exchange.azure.block-size", "8MB") + .put("exchange.azure.max-error-retries", "8") .buildOrThrow(); ExchangeAzureConfig expected = new ExchangeAzureConfig() .setAzureStorageConnectionString("connection") - .setAzureStorageBlockSize(DataSize.of(8, MEGABYTE)); + .setAzureStorageBlockSize(DataSize.of(8, MEGABYTE)) + .setMaxErrorRetries(8); assertFullMapping(properties, expected); }