Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,20 @@ public S3Client s3() {
@Override
public S3AsyncClient s3Async() {
if (s3FileIOProperties.isS3CRTEnabled()) {
return S3AsyncClient.crtBuilder().applyMutation(this::applyAssumeRoleConfigurations).build();
return S3AsyncClient.crtBuilder()
.applyMutation(this::applyAssumeRoleConfigurations)
.applyMutation(awsClientProperties::applyClientRegionConfiguration)
.applyMutation(awsClientProperties::applyClientCredentialConfigurations)
.applyMutation(s3FileIOProperties::applyEndpointConfigurations)
.applyMutation(s3FileIOProperties::applyS3CrtConfigurations)
.build();
}
return S3AsyncClient.builder().applyMutation(this::applyAssumeRoleConfigurations).build();
return S3AsyncClient.builder()
.applyMutation(this::applyAssumeRoleConfigurations)
.applyMutation(awsClientProperties::applyClientRegionConfiguration)
.applyMutation(awsClientProperties::applyClientCredentialConfigurations)
.applyMutation(s3FileIOProperties::applyEndpointConfigurations)
.build();
}

@Override
Expand Down
13 changes: 11 additions & 2 deletions aws/src/main/java/org/apache/iceberg/aws/AwsClientFactories.java
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,18 @@ public S3Client s3() {
@Override
public S3AsyncClient s3Async() {
if (s3FileIOProperties.isS3CRTEnabled()) {
return S3AsyncClient.crtBuilder().build();
return S3AsyncClient.crtBuilder()
.applyMutation(awsClientProperties::applyClientRegionConfiguration)
.applyMutation(awsClientProperties::applyClientCredentialConfigurations)
.applyMutation(s3FileIOProperties::applyEndpointConfigurations)
.applyMutation(s3FileIOProperties::applyS3CrtConfigurations)
.build();
}
return S3AsyncClient.builder().build();
return S3AsyncClient.builder()
.applyMutation(awsClientProperties::applyClientRegionConfiguration)
.applyMutation(awsClientProperties::applyClientCredentialConfigurations)
.applyMutation(s3FileIOProperties::applyEndpointConfigurations)
.build();
}

@Override
Expand Down
31 changes: 31 additions & 0 deletions aws/src/main/java/org/apache/iceberg/aws/AwsClientProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.retry.RetryMode;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3CrtAsyncClientBuilder;

public class AwsClientProperties implements Serializable {
/**
Expand Down Expand Up @@ -135,6 +136,21 @@ public <T extends AwsClientBuilder> void applyClientRegionConfiguration(T builde
}
}

/**
* Configure an S3 CRT client region.
*
* <p>Sample usage:
*
* <pre>
* S3AsyncClient.crtBuilder().applyMutation(awsClientProperties::applyClientRegionConfiguration)
* </pre>
*/
public <T extends S3CrtAsyncClientBuilder> void applyClientRegionConfiguration(T builder) {
if (clientRegion != null) {
builder.region(Region.of(clientRegion));
}
}

/**
* Configure the credential provider for AWS clients.
*
Expand All @@ -150,6 +166,21 @@ public <T extends AwsClientBuilder> void applyClientCredentialConfigurations(T b
}
}

/**
* Configure the credential provider for S3 CRT clients.
*
* <p>Sample usage:
*
* <pre>
* S3AsyncClient.crtBuilder().applyMutation(awsClientProperties::applyClientCredentialConfigurations)
* </pre>
*/
public <T extends S3CrtAsyncClientBuilder> void applyClientCredentialConfigurations(T builder) {
if (!Strings.isNullOrEmpty(this.clientCredentialsProvider)) {
builder.credentialsProvider(credentialsProvider(this.clientCredentialsProvider));
}
}

/**
* Returns a credentials provider instance. If {@link #refreshCredentialsEndpoint} is set, an
* instance of {@link VendedCredentialsProvider} is returned. If params were set, we return a new
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,17 @@ public S3Client s3() {
@Override
public S3AsyncClient s3Async() {
if (s3FileIOProperties.isS3CRTEnabled()) {
return S3AsyncClient.crtBuilder().build();
return S3AsyncClient.crtBuilder()
.applyMutation(awsClientProperties::applyClientRegionConfiguration)
.applyMutation(awsClientProperties::applyClientCredentialConfigurations)
.applyMutation(s3FileIOProperties::applyEndpointConfigurations)
.applyMutation(s3FileIOProperties::applyS3CrtConfigurations)
.build();
}
return S3AsyncClient.builder().build();
return S3AsyncClient.builder()
.applyMutation(awsClientProperties::applyClientRegionConfiguration)
.applyMutation(awsClientProperties::applyClientCredentialConfigurations)
.applyMutation(s3FileIOProperties::applyEndpointConfigurations)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@
import software.amazon.awssdk.core.retry.conditions.RetryCondition;
import software.amazon.awssdk.core.retry.conditions.RetryOnExceptionsCondition;
import software.amazon.awssdk.core.retry.conditions.TokenBucketRetryCondition;
import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
import software.amazon.awssdk.services.s3.S3ClientBuilder;
import software.amazon.awssdk.services.s3.S3Configuration;
import software.amazon.awssdk.services.s3.S3CrtAsyncClientBuilder;
import software.amazon.awssdk.services.s3.model.ObjectCannedACL;
import software.amazon.awssdk.services.s3.model.Tag;

Expand Down Expand Up @@ -98,6 +100,17 @@ public class S3FileIOProperties implements Serializable {

public static final boolean S3_CRT_ENABLED_DEFAULT = true;

/** This property is used to specify the max concurrency for S3 CRT clients. */
public static final String S3_CRT_MAX_CONCURRENCY = "s3.crt.max-concurrency";

/**
* To fully benefit from the analytics-accelerator-s3 library where this S3 CRT client is used, it
* is recommended to initialize with higher concurrency.
*
* <p>For more details, see: https://github.com/awslabs/analytics-accelerator-s3
*/
public static final int S3_CRT_MAX_CONCURRENCY_DEFAULT = 500;

/**
* The fallback-to-iam property allows users to customize whether or not they would like their
* jobs fall back to the Job Execution IAM role in case they get an Access Denied from the S3
Expand Down Expand Up @@ -513,6 +526,7 @@ public class S3FileIOProperties implements Serializable {
private final boolean isS3AnalyticsAcceleratorEnabled;
private final Map<String, String> s3AnalyticsacceleratorProperties;
private final boolean isS3CRTEnabled;
private final int s3CrtMaxConcurrency;
private String writeStorageClass;
private int s3RetryNumRetries;
private long s3RetryMinWaitMs;
Expand Down Expand Up @@ -560,6 +574,7 @@ public S3FileIOProperties() {
this.isS3AnalyticsAcceleratorEnabled = S3_ANALYTICS_ACCELERATOR_ENABLED_DEFAULT;
this.s3AnalyticsacceleratorProperties = Maps.newHashMap();
this.isS3CRTEnabled = S3_CRT_ENABLED_DEFAULT;
this.s3CrtMaxConcurrency = S3_CRT_MAX_CONCURRENCY_DEFAULT;
this.allProperties = Maps.newHashMap();

ValidationException.check(
Expand Down Expand Up @@ -679,6 +694,9 @@ public S3FileIOProperties(Map<String, String> properties) {
PropertyUtil.propertiesWithPrefix(properties, S3_ANALYTICS_ACCELERATOR_PROPERTIES_PREFIX);
this.isS3CRTEnabled =
PropertyUtil.propertyAsBoolean(properties, S3_CRT_ENABLED, S3_CRT_ENABLED_DEFAULT);
this.s3CrtMaxConcurrency =
PropertyUtil.propertyAsInt(
properties, S3_CRT_MAX_CONCURRENCY, S3_CRT_MAX_CONCURRENCY_DEFAULT);

ValidationException.check(
keyIdAccessKeyBothConfigured(),
Expand Down Expand Up @@ -805,6 +823,10 @@ public boolean isS3CRTEnabled() {
return isS3CRTEnabled;
}

public int s3CrtMaxConcurrency() {
return s3CrtMaxConcurrency;
}

public String endpoint() {
return this.endpoint;
}
Expand Down Expand Up @@ -1004,6 +1026,36 @@ public <T extends S3ClientBuilder> void applyEndpointConfigurations(T builder) {
}
}

/**
* Override the endpoint for an S3 async client
*
* <p>Sample usage:
*
* <pre>
* S3AsyncClient.builder().applyMutation(s3FileIOProperties::applyEndpointConfigurations)
* </pre>
*/
public <T extends S3AsyncClientBuilder> void applyEndpointConfigurations(T builder) {
if (endpoint != null) {
builder.endpointOverride(URI.create(endpoint));
}
}

/**
* Override the endpoint for an S3 CRT client
*
* <p>Sample usage:
*
* <pre>
* S3AsyncClient.crtBuilder().applyMutation(s3FileIOProperties::applyEndpointConfigurations)
* </pre>
*/
public <T extends S3CrtAsyncClientBuilder> void applyEndpointConfigurations(T builder) {
if (endpoint != null) {
builder.endpointOverride(URI.create(endpoint));
}
}

/**
* Override the retry configurations for an S3 client.
*
Expand Down Expand Up @@ -1098,6 +1150,10 @@ public <T extends S3ClientBuilder> void applyUserAgentConfigurations(T builder)
.build());
}

public S3CrtAsyncClientBuilder applyS3CrtConfigurations(S3CrtAsyncClientBuilder builder) {
return builder.maxConcurrency(s3CrtMaxConcurrency());
}

/**
* Dynamically load the http client builder to avoid runtime deps requirements of any optional SDK
* Plugins
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@
import org.mockito.Mockito;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.retry.RetryPolicy;
import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3ClientBuilder;
import software.amazon.awssdk.services.s3.S3Configuration;
import software.amazon.awssdk.services.s3.S3CrtAsyncClientBuilder;
import software.amazon.awssdk.services.s3.model.ObjectCannedACL;
import software.amazon.awssdk.services.s3.model.Tag;

Expand All @@ -51,6 +53,7 @@ public class TestS3FileIOProperties {
private static final String S3_DELETE_TAG_KEY = "my_key";
private static final String S3_DELETE_TAG_VALUE = "my_value";

@SuppressWarnings("MethodLength")
@Test
public void testS3FileIOPropertiesDefaultValues() {
S3FileIOProperties s3FileIOProperties = new S3FileIOProperties();
Expand Down Expand Up @@ -122,8 +125,18 @@ public void testS3FileIOPropertiesDefaultValues() {
.isEqualTo(s3FileIOProperties.isDeleteEnabled());

assertThat(Collections.emptyMap()).isEqualTo(s3FileIOProperties.bucketToAccessPointMapping());

assertThat(S3FileIOProperties.S3_CRT_MAX_CONCURRENCY_DEFAULT)
.isEqualTo(s3FileIOProperties.s3CrtMaxConcurrency());

assertThat(S3FileIOProperties.S3_CRT_ENABLED_DEFAULT)
.isEqualTo(s3FileIOProperties.isS3CRTEnabled());

assertThat(S3FileIOProperties.S3_ANALYTICS_ACCELERATOR_ENABLED_DEFAULT)
.isEqualTo(s3FileIOProperties.isS3AnalyticsAcceleratorEnabled());
}

@SuppressWarnings("MethodLength")
@Test
public void testS3FileIOProperties() {
Map<String, String> map = getTestProperties();
Expand Down Expand Up @@ -265,6 +278,20 @@ public void testS3FileIOProperties() {
String.valueOf(s3FileIOProperties.isRemoteSigningEnabled()));

assertThat(map).containsEntry(S3FileIOProperties.WRITE_STORAGE_CLASS, "INTELLIGENT_TIERING");

assertThat(map)
.containsEntry(
S3FileIOProperties.S3_CRT_MAX_CONCURRENCY,
String.valueOf(s3FileIOProperties.s3CrtMaxConcurrency()));

assertThat(map)
.containsEntry(
S3FileIOProperties.S3_CRT_ENABLED, String.valueOf(s3FileIOProperties.isS3CRTEnabled()));

assertThat(map)
.containsEntry(
S3FileIOProperties.S3_ANALYTICS_ACCELERATOR_ENABLED,
String.valueOf(s3FileIOProperties.isS3AnalyticsAcceleratorEnabled()));
}

@Test
Expand Down Expand Up @@ -412,6 +439,9 @@ private Map<String, String> getTestProperties() {
map.put(S3FileIOProperties.PRELOAD_CLIENT_ENABLED, "true");
map.put(S3FileIOProperties.REMOTE_SIGNING_ENABLED, "true");
map.put(S3FileIOProperties.WRITE_STORAGE_CLASS, "INTELLIGENT_TIERING");
map.put(S3FileIOProperties.S3_CRT_MAX_CONCURRENCY, "200");
map.put(S3FileIOProperties.S3_CRT_ENABLED, "false");
map.put(S3FileIOProperties.S3_ANALYTICS_ACCELERATOR_ENABLED, "true");
return map;
}

Expand Down Expand Up @@ -489,9 +519,17 @@ public void testApplyEndpointConfiguration() {
properties.put(S3FileIOProperties.ENDPOINT, "endpoint");
S3FileIOProperties s3FileIOProperties = new S3FileIOProperties(properties);
S3ClientBuilder mockS3ClientBuilder = Mockito.mock(S3ClientBuilder.class);
S3AsyncClientBuilder mockS3AsyncClientBuilder = Mockito.mock(S3AsyncClientBuilder.class);
S3CrtAsyncClientBuilder mockS3CrtAsyncClientBuilder =
Mockito.mock(S3CrtAsyncClientBuilder.class);

s3FileIOProperties.applyEndpointConfigurations(mockS3ClientBuilder);
s3FileIOProperties.applyEndpointConfigurations(mockS3AsyncClientBuilder);
s3FileIOProperties.applyEndpointConfigurations(mockS3CrtAsyncClientBuilder);

Mockito.verify(mockS3ClientBuilder).endpointOverride(Mockito.any(URI.class));
Mockito.verify(mockS3AsyncClientBuilder).endpointOverride(Mockito.any(URI.class));
Mockito.verify(mockS3CrtAsyncClientBuilder).endpointOverride(Mockito.any(URI.class));
}

@Test
Expand All @@ -505,6 +543,17 @@ public void testApplyUserAgentConfigurations() {
.overrideConfiguration(Mockito.any(ClientOverrideConfiguration.class));
}

@Test
public void testApplyS3CrtConfigurations() {
Map<String, String> properties = Maps.newHashMap();
S3FileIOProperties s3FileIOProperties = new S3FileIOProperties(properties);
S3CrtAsyncClientBuilder mockS3CrtAsyncClientBuilder =
Mockito.mock(S3CrtAsyncClientBuilder.class);
s3FileIOProperties.applyS3CrtConfigurations(mockS3CrtAsyncClientBuilder);

Mockito.verify(mockS3CrtAsyncClientBuilder).maxConcurrency(Mockito.any(Integer.class));
}

@Test
public void testApplyRetryConfiguration() {
Map<String, String> properties = Maps.newHashMap();
Expand Down
Loading