diff --git a/lib/trino-filesystem-gcs/pom.xml b/lib/trino-filesystem-gcs/pom.xml
index 33acd16414f3..a958870f3926 100644
--- a/lib/trino-filesystem-gcs/pom.xml
+++ b/lib/trino-filesystem-gcs/pom.xml
@@ -123,6 +123,11 @@
jakarta.validation-api
+
+ org.threeten
+ threetenbp
+
+
io.trino
trino-spi
@@ -141,6 +146,12 @@
test
+
+ io.airlift
+ testing
+ test
+
+
io.trino
trino-filesystem
diff --git a/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsFileSystemConfig.java b/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsFileSystemConfig.java
index f71b4997f695..b5c8652ccdd3 100644
--- a/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsFileSystemConfig.java
+++ b/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsFileSystemConfig.java
@@ -18,10 +18,15 @@
import io.airlift.configuration.ConfigSecuritySensitive;
import io.airlift.configuration.validation.FileExists;
import io.airlift.units.DataSize;
+import io.airlift.units.Duration;
+import io.airlift.units.MinDuration;
import jakarta.annotation.Nullable;
+import jakarta.validation.constraints.AssertTrue;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotNull;
+import java.util.concurrent.TimeUnit;
+
import static com.google.common.base.Preconditions.checkState;
import static io.airlift.units.DataSize.Unit.MEGABYTE;
@@ -37,6 +42,12 @@ public class GcsFileSystemConfig
private boolean useGcsAccessToken;
private String jsonKey;
private String jsonKeyFilePath;
+ private int maxRetries = 20;
+ private double backoffScaleFactor = 2.0;
+ private Duration maxRetryTime = new Duration(20, TimeUnit.SECONDS);
+ private Duration minBackoffDelay = new Duration(10, TimeUnit.MILLISECONDS);
+ // Note: there is no benefit to setting this much higher as the rpc quota is 1x per second: https://cloud.google.com/storage/docs/retry-strategy#java
+ private Duration maxBackoffDelay = new Duration(1100, TimeUnit.MILLISECONDS);
@NotNull
public DataSize getReadBlockSize()
@@ -148,14 +159,92 @@ public GcsFileSystemConfig setJsonKeyFilePath(String jsonKeyFilePath)
return this;
}
+ @Min(0)
+ public int getMaxRetries()
+ {
+ return maxRetries;
+ }
+
+ @Config("gcs.client.max-retries")
+ @ConfigDescription("Maximum number of RPC attempts")
+ public GcsFileSystemConfig setMaxRetries(int maxRetries)
+ {
+ this.maxRetries = maxRetries;
+ return this;
+ }
+
+ @Min(1)
+ public double getBackoffScaleFactor()
+ {
+ return backoffScaleFactor;
+ }
+
+ @Config("gcs.client.backoff-scale-factor")
+ @ConfigDescription("Scale factor for RPC retry delay")
+ public GcsFileSystemConfig setBackoffScaleFactor(double backoffScaleFactor)
+ {
+ this.backoffScaleFactor = backoffScaleFactor;
+ return this;
+ }
+
+ @NotNull
+ public Duration getMaxRetryTime()
+ {
+ return maxRetryTime;
+ }
+
+ @Config("gcs.client.max-retry-time")
+ @ConfigDescription("Total time limit for an RPC to be retried")
+ public GcsFileSystemConfig setMaxRetryTime(Duration maxRetryTime)
+ {
+ this.maxRetryTime = maxRetryTime;
+ return this;
+ }
+
+ @NotNull
+ @MinDuration("0ms")
+ public Duration getMinBackoffDelay()
+ {
+ return minBackoffDelay;
+ }
+
+ @Config("gcs.client.min-backoff-delay")
+ @ConfigDescription("Minimum delay between RPC retries")
+ public GcsFileSystemConfig setMinBackoffDelay(Duration minBackoffDelay)
+ {
+ this.minBackoffDelay = minBackoffDelay;
+ return this;
+ }
+
+ @NotNull
+ @MinDuration("0ms")
+ public Duration getMaxBackoffDelay()
+ {
+ return maxBackoffDelay;
+ }
+
+ @Config("gcs.client.max-backoff-delay")
+ @ConfigDescription("Maximum delay between RPC retries.")
+ public GcsFileSystemConfig setMaxBackoffDelay(Duration maxBackoffDelay)
+ {
+ this.maxBackoffDelay = maxBackoffDelay;
+ return this;
+ }
+
+ @AssertTrue(message = "gcs.client.min-backoff-delay must be less than or equal to gcs.client.max-backoff-delay")
+ public boolean isRetryDelayValid()
+ {
+ return minBackoffDelay.compareTo(maxBackoffDelay) <= 0;
+ }
+
public void validate()
{
- // This cannot be normal validation, as it would make it impossible to write TestHiveGcsConfig.testExplicitPropertyMappings
+ // This cannot be normal validation, as it would make it impossible to write TestGcsFileSystemConfig.testExplicitPropertyMappings
if (useGcsAccessToken) {
- checkState(jsonKey == null, "Cannot specify 'hive.gcs.json-key' when 'hive.gcs.use-access-token' is set");
- checkState(jsonKeyFilePath == null, "Cannot specify 'hive.gcs.json-key-file-path' when 'hive.gcs.use-access-token' is set");
+ checkState(jsonKey == null, "Cannot specify 'gcs.json-key' when 'gcs.use-access-token' is set");
+ checkState(jsonKeyFilePath == null, "Cannot specify 'gcs.json-key-file-path' when 'gcs.use-access-token' is set");
}
- checkState(jsonKey == null || jsonKeyFilePath == null, "'hive.gcs.json-key' and 'hive.gcs.json-key-file-path' cannot be both set");
+ checkState(jsonKey == null || jsonKeyFilePath == null, "'gcs.json-key' and 'gcs.json-key-file-path' cannot be both set");
}
}
diff --git a/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsStorageFactory.java b/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsStorageFactory.java
index 176d45061fda..bdb85f82d7e2 100644
--- a/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsStorageFactory.java
+++ b/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsStorageFactory.java
@@ -13,6 +13,7 @@
*/
package io.trino.filesystem.gcs;
+import com.google.api.gax.retrying.RetrySettings;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
@@ -20,6 +21,7 @@
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import io.trino.spi.security.ConnectorIdentity;
+import org.threeten.bp.Duration;
import java.io.ByteArrayInputStream;
import java.io.FileInputStream;
@@ -29,6 +31,7 @@
import java.util.List;
import java.util.Optional;
+import static com.google.cloud.storage.StorageRetryStrategy.getUniformStorageRetryStrategy;
import static com.google.common.base.Strings.nullToEmpty;
import static java.nio.charset.StandardCharsets.UTF_8;
@@ -39,6 +42,11 @@ public class GcsStorageFactory
private final String projectId;
private final boolean useGcsAccessToken;
private final Optional jsonGoogleCredential;
+ private final int maxRetries;
+ private final double backoffScaleFactor;
+ private final Duration maxRetryTime;
+ private final Duration minBackoffDelay;
+ private final Duration maxBackoffDelay;
@Inject
public GcsStorageFactory(GcsFileSystemConfig config)
@@ -62,6 +70,12 @@ else if (jsonKeyFilePath != null) {
else {
jsonGoogleCredential = Optional.empty();
}
+ this.maxRetries = config.getMaxRetries();
+ this.backoffScaleFactor = config.getBackoffScaleFactor();
+ // To avoid name collision by importing io.airlift.Duration
+ this.maxRetryTime = Duration.ofMillis(config.getMaxRetryTime().toMillis());
+ this.minBackoffDelay = Duration.ofMillis(config.getMinBackoffDelay().toMillis());
+ this.maxBackoffDelay = Duration.ofMillis(config.getMaxBackoffDelay().toMillis());
}
public Storage create(ConnectorIdentity identity)
@@ -81,7 +95,20 @@ public Storage create(ConnectorIdentity identity)
if (projectId != null) {
storageOptionsBuilder.setProjectId(projectId);
}
- return storageOptionsBuilder.setCredentials(credentials).build().getService();
+ // Note: without uniform strategy we cannot retry idempotent operations.
+ // The trino-filesystem api does not violate the conditions for idempotency, see https://cloud.google.com/storage/docs/retry-strategy#java for details.
+ return storageOptionsBuilder
+ .setCredentials(credentials)
+ .setStorageRetryStrategy(getUniformStorageRetryStrategy())
+ .setRetrySettings(RetrySettings.newBuilder()
+ .setMaxAttempts(maxRetries + 1)
+ .setRetryDelayMultiplier(backoffScaleFactor)
+ .setTotalTimeout(maxRetryTime)
+ .setInitialRetryDelay(minBackoffDelay)
+ .setMaxRetryDelay(maxBackoffDelay)
+ .build())
+ .build()
+ .getService();
}
catch (IOException e) {
throw new UncheckedIOException(e);
diff --git a/lib/trino-filesystem-gcs/src/test/java/io/trino/filesystem/gcs/TestGcsFileSystemConfig.java b/lib/trino-filesystem-gcs/src/test/java/io/trino/filesystem/gcs/TestGcsFileSystemConfig.java
index 2c6dd4a9992b..d5b640f8f738 100644
--- a/lib/trino-filesystem-gcs/src/test/java/io/trino/filesystem/gcs/TestGcsFileSystemConfig.java
+++ b/lib/trino-filesystem-gcs/src/test/java/io/trino/filesystem/gcs/TestGcsFileSystemConfig.java
@@ -15,6 +15,8 @@
import com.google.common.collect.ImmutableMap;
import io.airlift.units.DataSize;
+import io.airlift.units.Duration;
+import jakarta.validation.constraints.AssertTrue;
import org.junit.jupiter.api.Test;
import java.io.IOException;
@@ -25,6 +27,10 @@
import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping;
import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults;
import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults;
+import static io.airlift.testing.ValidationAssertions.assertFailsValidation;
+import static io.airlift.units.DataSize.Unit.MEGABYTE;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
public class TestGcsFileSystemConfig
@@ -33,14 +39,19 @@ public class TestGcsFileSystemConfig
void testDefaults()
{
assertRecordedDefaults(recordDefaults(GcsFileSystemConfig.class)
- .setReadBlockSize(DataSize.of(2, DataSize.Unit.MEGABYTE))
- .setWriteBlockSize(DataSize.of(16, DataSize.Unit.MEGABYTE))
+ .setReadBlockSize(DataSize.of(2, MEGABYTE))
+ .setWriteBlockSize(DataSize.of(16, MEGABYTE))
.setPageSize(100)
.setBatchSize(100)
.setProjectId(null)
.setUseGcsAccessToken(false)
.setJsonKey(null)
- .setJsonKeyFilePath(null));
+ .setJsonKeyFilePath(null)
+ .setMaxRetries(20)
+ .setBackoffScaleFactor(2.0)
+ .setMaxRetryTime(new Duration(20, SECONDS))
+ .setMinBackoffDelay(new Duration(10, MILLISECONDS))
+ .setMaxBackoffDelay(new Duration(1100, MILLISECONDS)));
}
@Test
@@ -58,17 +69,27 @@ void testExplicitPropertyMappings()
.put("gcs.use-access-token", "true")
.put("gcs.json-key", "{}")
.put("gcs.json-key-file-path", jsonKeyFile.toString())
+ .put("gcs.client.max-retries", "10")
+ .put("gcs.client.backoff-scale-factor", "3.0")
+ .put("gcs.client.max-retry-time", "10s")
+ .put("gcs.client.min-backoff-delay", "20ms")
+ .put("gcs.client.max-backoff-delay", "20ms")
.buildOrThrow();
GcsFileSystemConfig expected = new GcsFileSystemConfig()
- .setReadBlockSize(DataSize.of(51, DataSize.Unit.MEGABYTE))
- .setWriteBlockSize(DataSize.of(52, DataSize.Unit.MEGABYTE))
+ .setReadBlockSize(DataSize.of(51, MEGABYTE))
+ .setWriteBlockSize(DataSize.of(52, MEGABYTE))
.setPageSize(10)
.setBatchSize(11)
.setProjectId("project")
.setUseGcsAccessToken(true)
.setJsonKey("{}")
- .setJsonKeyFilePath(jsonKeyFile.toString());
+ .setJsonKeyFilePath(jsonKeyFile.toString())
+ .setMaxRetries(10)
+ .setBackoffScaleFactor(3.0)
+ .setMaxRetryTime(new Duration(10, SECONDS))
+ .setMinBackoffDelay(new Duration(20, MILLISECONDS))
+ .setMaxBackoffDelay(new Duration(20, MILLISECONDS));
assertFullMapping(properties, expected);
}
@@ -80,20 +101,29 @@ public void testValidation()
.setUseGcsAccessToken(true)
.setJsonKey("{}}")::validate)
.isInstanceOf(IllegalStateException.class)
- .hasMessage("Cannot specify 'hive.gcs.json-key' when 'hive.gcs.use-access-token' is set");
+ .hasMessage("Cannot specify 'gcs.json-key' when 'gcs.use-access-token' is set");
assertThatThrownBy(
new GcsFileSystemConfig()
.setUseGcsAccessToken(true)
.setJsonKeyFilePath("/dev/null")::validate)
.isInstanceOf(IllegalStateException.class)
- .hasMessage("Cannot specify 'hive.gcs.json-key-file-path' when 'hive.gcs.use-access-token' is set");
+ .hasMessage("Cannot specify 'gcs.json-key-file-path' when 'gcs.use-access-token' is set");
assertThatThrownBy(
new GcsFileSystemConfig()
.setJsonKey("{}")
.setJsonKeyFilePath("/dev/null")::validate)
.isInstanceOf(IllegalStateException.class)
- .hasMessage("'hive.gcs.json-key' and 'hive.gcs.json-key-file-path' cannot be both set");
+ .hasMessage("'gcs.json-key' and 'gcs.json-key-file-path' cannot be both set");
+
+ assertFailsValidation(
+ new GcsFileSystemConfig()
+ .setJsonKey("{}")
+ .setMinBackoffDelay(new Duration(20, MILLISECONDS))
+ .setMaxBackoffDelay(new Duration(19, MILLISECONDS)),
+ "retryDelayValid",
+ "gcs.client.min-backoff-delay must be less than or equal to gcs.client.max-backoff-delay",
+ AssertTrue.class);
}
}
diff --git a/lib/trino-filesystem-gcs/src/test/java/io/trino/filesystem/gcs/TestGcsFileSystemGcs.java b/lib/trino-filesystem-gcs/src/test/java/io/trino/filesystem/gcs/TestGcsFileSystemGcs.java
index b9a056e487fc..852b30da274b 100644
--- a/lib/trino-filesystem-gcs/src/test/java/io/trino/filesystem/gcs/TestGcsFileSystemGcs.java
+++ b/lib/trino-filesystem-gcs/src/test/java/io/trino/filesystem/gcs/TestGcsFileSystemGcs.java
@@ -13,10 +13,16 @@
*/
package io.trino.filesystem.gcs;
+import io.trino.filesystem.TrinoOutputFile;
import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import java.io.IOException;
+import java.io.OutputStream;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.assertj.core.api.Assertions.assertThatNoException;
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class TestGcsFileSystemGcs
@@ -28,4 +34,16 @@ void setup()
{
initialize(getRequiredEnvironmentVariable("GCP_CREDENTIALS_KEY"));
}
+
+ @Test
+ void testCreateFileRetry()
+ {
+ assertThatNoException().isThrownBy(() -> {
+ for (int i = 1; i <= 100; i++) {
+ TrinoOutputFile outputFile = getFileSystem().newOutputFile(getRootLocation().appendPath("testFile"));
+ try (OutputStream out = outputFile.createOrOverwrite()) {
+ out.write("test".getBytes(UTF_8));
+ }
+ }});
+ }
}
diff --git a/plugin/trino-bigquery/pom.xml b/plugin/trino-bigquery/pom.xml
index 2e339e1349ce..446a8930c7a4 100644
--- a/plugin/trino-bigquery/pom.xml
+++ b/plugin/trino-bigquery/pom.xml
@@ -41,12 +41,6 @@
conscrypt-openjdk-uber
2.5.2
-
-
- org.threeten
- threetenbp
- 1.6.8
-
diff --git a/pom.xml b/pom.xml
index 81777afc45f8..50b4e9813ee3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2051,6 +2051,12 @@
1.13.1
+
+ org.threeten
+ threetenbp
+ 1.6.8
+
+
org.xerial.snappy
snappy-java