Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions lib/trino-filesystem-gcs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@
<artifactId>jakarta.validation-api</artifactId>
</dependency>

<dependency>
<groupId>org.threeten</groupId>
<artifactId>threetenbp</artifactId>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-spi</artifactId>
Expand All @@ -141,6 +146,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>testing</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-filesystem</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@
import io.airlift.configuration.ConfigSecuritySensitive;
import io.airlift.configuration.validation.FileExists;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.airlift.units.MinDuration;
import jakarta.annotation.Nullable;
import jakarta.validation.constraints.AssertTrue;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotNull;

import java.util.concurrent.TimeUnit;

import static com.google.common.base.Preconditions.checkState;
import static io.airlift.units.DataSize.Unit.MEGABYTE;

Expand All @@ -37,6 +42,12 @@ public class GcsFileSystemConfig
private boolean useGcsAccessToken;
private String jsonKey;
private String jsonKeyFilePath;
private int maxRetries = 20;
private double backoffScaleFactor = 2.0;
private Duration maxRetryTime = new Duration(20, TimeUnit.SECONDS);
private Duration minBackoffDelay = new Duration(10, TimeUnit.MILLISECONDS);
// Note: there is no benefit to setting this much higher as the rpc quota is 1x per second: https://cloud.google.com/storage/docs/retry-strategy#java
private Duration maxBackoffDelay = new Duration(1100, TimeUnit.MILLISECONDS);

@NotNull
public DataSize getReadBlockSize()
Expand Down Expand Up @@ -148,14 +159,92 @@ public GcsFileSystemConfig setJsonKeyFilePath(String jsonKeyFilePath)
return this;
}

@Min(0)
public int getMaxRetries()
{
return maxRetries;
}

@Config("gcs.client.max-retries")
@ConfigDescription("Maximum number of RPC attempts")
public GcsFileSystemConfig setMaxRetries(int maxRetries)
{
this.maxRetries = maxRetries;
return this;
}

@Min(1)
public double getBackoffScaleFactor()
{
return backoffScaleFactor;
}

@Config("gcs.client.backoff-scale-factor")
@ConfigDescription("Scale factor for RPC retry delay")
public GcsFileSystemConfig setBackoffScaleFactor(double backoffScaleFactor)
{
this.backoffScaleFactor = backoffScaleFactor;
return this;
}

@NotNull
public Duration getMaxRetryTime()
{
return maxRetryTime;
}

@Config("gcs.client.max-retry-time")
@ConfigDescription("Total time limit for an RPC to be retried")
public GcsFileSystemConfig setMaxRetryTime(Duration maxRetryTime)
{
this.maxRetryTime = maxRetryTime;
return this;
}

@NotNull
@MinDuration("0ms")
public Duration getMinBackoffDelay()
{
return minBackoffDelay;
}

@Config("gcs.client.min-backoff-delay")
@ConfigDescription("Minimum delay between RPC retries")
public GcsFileSystemConfig setMinBackoffDelay(Duration minBackoffDelay)
{
this.minBackoffDelay = minBackoffDelay;
return this;
}

@NotNull
@MinDuration("0ms")
public Duration getMaxBackoffDelay()
{
return maxBackoffDelay;
}

@Config("gcs.client.max-backoff-delay")
@ConfigDescription("Maximum delay between RPC retries.")
public GcsFileSystemConfig setMaxBackoffDelay(Duration maxBackoffDelay)
{
this.maxBackoffDelay = maxBackoffDelay;
return this;
}

@AssertTrue(message = "gcs.client.min-backoff-delay must be less than or equal to gcs.client.max-backoff-delay")
public boolean isRetryDelayValid()
{
return minBackoffDelay.compareTo(maxBackoffDelay) <= 0;
}

public void validate()
{
// This cannot be normal validation, as it would make it impossible to write TestHiveGcsConfig.testExplicitPropertyMappings
// This cannot be normal validation, as it would make it impossible to write TestGcsFileSystemConfig.testExplicitPropertyMappings

if (useGcsAccessToken) {
checkState(jsonKey == null, "Cannot specify 'hive.gcs.json-key' when 'hive.gcs.use-access-token' is set");
checkState(jsonKeyFilePath == null, "Cannot specify 'hive.gcs.json-key-file-path' when 'hive.gcs.use-access-token' is set");
checkState(jsonKey == null, "Cannot specify 'gcs.json-key' when 'gcs.use-access-token' is set");
checkState(jsonKeyFilePath == null, "Cannot specify 'gcs.json-key-file-path' when 'gcs.use-access-token' is set");
}
checkState(jsonKey == null || jsonKeyFilePath == null, "'hive.gcs.json-key' and 'hive.gcs.json-key-file-path' cannot be both set");
checkState(jsonKey == null || jsonKeyFilePath == null, "'gcs.json-key' and 'gcs.json-key-file-path' cannot be both set");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@
*/
package io.trino.filesystem.gcs;

import com.google.api.gax.retrying.RetrySettings;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import com.google.common.base.VerifyException;
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import io.trino.spi.security.ConnectorIdentity;
import org.threeten.bp.Duration;

import java.io.ByteArrayInputStream;
import java.io.FileInputStream;
Expand All @@ -29,6 +31,7 @@
import java.util.List;
import java.util.Optional;

import static com.google.cloud.storage.StorageRetryStrategy.getUniformStorageRetryStrategy;
import static com.google.common.base.Strings.nullToEmpty;
import static java.nio.charset.StandardCharsets.UTF_8;

Expand All @@ -39,6 +42,11 @@ public class GcsStorageFactory
private final String projectId;
private final boolean useGcsAccessToken;
private final Optional<GoogleCredentials> jsonGoogleCredential;
private final int maxRetries;
private final double backoffScaleFactor;
private final Duration maxRetryTime;
private final Duration minBackoffDelay;
private final Duration maxBackoffDelay;

@Inject
public GcsStorageFactory(GcsFileSystemConfig config)
Expand All @@ -62,6 +70,12 @@ else if (jsonKeyFilePath != null) {
else {
jsonGoogleCredential = Optional.empty();
}
this.maxRetries = config.getMaxRetries();
this.backoffScaleFactor = config.getBackoffScaleFactor();
// To avoid name collision by importing io.airlift.Duration
this.maxRetryTime = Duration.ofMillis(config.getMaxRetryTime().toMillis());
this.minBackoffDelay = Duration.ofMillis(config.getMinBackoffDelay().toMillis());
this.maxBackoffDelay = Duration.ofMillis(config.getMaxBackoffDelay().toMillis());
}

public Storage create(ConnectorIdentity identity)
Expand All @@ -81,7 +95,20 @@ public Storage create(ConnectorIdentity identity)
if (projectId != null) {
storageOptionsBuilder.setProjectId(projectId);
}
return storageOptionsBuilder.setCredentials(credentials).build().getService();
// Note: without uniform strategy we cannot retry idempotent operations.
// The trino-filesystem api does not violate the conditions for idempotency, see https://cloud.google.com/storage/docs/retry-strategy#java for details.
return storageOptionsBuilder
.setCredentials(credentials)
.setStorageRetryStrategy(getUniformStorageRetryStrategy())
.setRetrySettings(RetrySettings.newBuilder()
.setMaxAttempts(maxRetries + 1)
.setRetryDelayMultiplier(backoffScaleFactor)
.setTotalTimeout(maxRetryTime)
.setInitialRetryDelay(minBackoffDelay)
.setMaxRetryDelay(maxBackoffDelay)
.build())
.build()
.getService();
}
catch (IOException e) {
throw new UncheckedIOException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

import com.google.common.collect.ImmutableMap;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import jakarta.validation.constraints.AssertTrue;
import org.junit.jupiter.api.Test;

import java.io.IOException;
Expand All @@ -25,6 +27,10 @@
import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping;
import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults;
import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults;
import static io.airlift.testing.ValidationAssertions.assertFailsValidation;
import static io.airlift.units.DataSize.Unit.MEGABYTE;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

public class TestGcsFileSystemConfig
Expand All @@ -33,14 +39,19 @@ public class TestGcsFileSystemConfig
void testDefaults()
{
assertRecordedDefaults(recordDefaults(GcsFileSystemConfig.class)
.setReadBlockSize(DataSize.of(2, DataSize.Unit.MEGABYTE))
.setWriteBlockSize(DataSize.of(16, DataSize.Unit.MEGABYTE))
.setReadBlockSize(DataSize.of(2, MEGABYTE))
.setWriteBlockSize(DataSize.of(16, MEGABYTE))
.setPageSize(100)
.setBatchSize(100)
.setProjectId(null)
.setUseGcsAccessToken(false)
.setJsonKey(null)
.setJsonKeyFilePath(null));
.setJsonKeyFilePath(null)
.setMaxRetries(20)
.setBackoffScaleFactor(2.0)
.setMaxRetryTime(new Duration(20, SECONDS))
.setMinBackoffDelay(new Duration(10, MILLISECONDS))
.setMaxBackoffDelay(new Duration(1100, MILLISECONDS)));
}

@Test
Expand All @@ -58,17 +69,27 @@ void testExplicitPropertyMappings()
.put("gcs.use-access-token", "true")
.put("gcs.json-key", "{}")
.put("gcs.json-key-file-path", jsonKeyFile.toString())
.put("gcs.client.max-retries", "10")
.put("gcs.client.backoff-scale-factor", "3.0")
.put("gcs.client.max-retry-time", "10s")
.put("gcs.client.min-backoff-delay", "20ms")
.put("gcs.client.max-backoff-delay", "20ms")
.buildOrThrow();

GcsFileSystemConfig expected = new GcsFileSystemConfig()
.setReadBlockSize(DataSize.of(51, DataSize.Unit.MEGABYTE))
.setWriteBlockSize(DataSize.of(52, DataSize.Unit.MEGABYTE))
.setReadBlockSize(DataSize.of(51, MEGABYTE))
.setWriteBlockSize(DataSize.of(52, MEGABYTE))
.setPageSize(10)
.setBatchSize(11)
.setProjectId("project")
.setUseGcsAccessToken(true)
.setJsonKey("{}")
.setJsonKeyFilePath(jsonKeyFile.toString());
.setJsonKeyFilePath(jsonKeyFile.toString())
.setMaxRetries(10)
.setBackoffScaleFactor(3.0)
.setMaxRetryTime(new Duration(10, SECONDS))
.setMinBackoffDelay(new Duration(20, MILLISECONDS))
.setMaxBackoffDelay(new Duration(20, MILLISECONDS));
assertFullMapping(properties, expected);
}

Expand All @@ -80,20 +101,29 @@ public void testValidation()
.setUseGcsAccessToken(true)
.setJsonKey("{}}")::validate)
.isInstanceOf(IllegalStateException.class)
.hasMessage("Cannot specify 'hive.gcs.json-key' when 'hive.gcs.use-access-token' is set");
.hasMessage("Cannot specify 'gcs.json-key' when 'gcs.use-access-token' is set");

assertThatThrownBy(
new GcsFileSystemConfig()
.setUseGcsAccessToken(true)
.setJsonKeyFilePath("/dev/null")::validate)
.isInstanceOf(IllegalStateException.class)
.hasMessage("Cannot specify 'hive.gcs.json-key-file-path' when 'hive.gcs.use-access-token' is set");
.hasMessage("Cannot specify 'gcs.json-key-file-path' when 'gcs.use-access-token' is set");

assertThatThrownBy(
new GcsFileSystemConfig()
.setJsonKey("{}")
.setJsonKeyFilePath("/dev/null")::validate)
.isInstanceOf(IllegalStateException.class)
.hasMessage("'hive.gcs.json-key' and 'hive.gcs.json-key-file-path' cannot be both set");
.hasMessage("'gcs.json-key' and 'gcs.json-key-file-path' cannot be both set");

assertFailsValidation(
new GcsFileSystemConfig()
.setJsonKey("{}")
.setMinBackoffDelay(new Duration(20, MILLISECONDS))
.setMaxBackoffDelay(new Duration(19, MILLISECONDS)),
"retryDelayValid",
"gcs.client.min-backoff-delay must be less than or equal to gcs.client.max-backoff-delay",
AssertTrue.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,16 @@
*/
package io.trino.filesystem.gcs;

import io.trino.filesystem.TrinoOutputFile;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;

import java.io.IOException;
import java.io.OutputStream;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.assertj.core.api.Assertions.assertThatNoException;

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class TestGcsFileSystemGcs
Expand All @@ -28,4 +34,16 @@ void setup()
{
initialize(getRequiredEnvironmentVariable("GCP_CREDENTIALS_KEY"));
}

@Test
void testCreateFileRetry()
{
assertThatNoException().isThrownBy(() -> {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused by the purpose of assertThatNoException(). Any exception should fail the test. Isn't that the normal behavior of unit tests?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it captures intent nicely - for the case when you just want to check code runs fine and do not do any assertions.
Otherwise, if there is code without assertions in test method, it kinda looks that test writer forgot about sth.

for (int i = 1; i <= 100; i++) {
TrinoOutputFile outputFile = getFileSystem().newOutputFile(getRootLocation().appendPath("testFile"));
try (OutputStream out = outputFile.createOrOverwrite()) {
out.write("test".getBytes(UTF_8));
}
}});
}
}
6 changes: 0 additions & 6 deletions plugin/trino-bigquery/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,6 @@
<artifactId>conscrypt-openjdk-uber</artifactId>
<version>2.5.2</version>
</dependency>

<dependency>
<groupId>org.threeten</groupId>
<artifactId>threetenbp</artifactId>
<version>1.6.8</version>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2051,6 +2051,12 @@
<version>1.13.1</version>
</dependency>

<dependency>
<groupId>org.threeten</groupId>
<artifactId>threetenbp</artifactId>
<version>1.6.8</version>
</dependency>

<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
Expand Down