Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,20 @@ public S3Client s3() {
@Override
public S3AsyncClient s3Async() {
if (s3FileIOProperties.isS3CRTEnabled()) {
return S3AsyncClient.crtBuilder().applyMutation(this::applyAssumeRoleConfigurations).build();
return S3AsyncClient.crtBuilder()
.applyMutation(this::applyAssumeRoleConfigurations)
.applyMutation(awsClientProperties::applyClientRegionConfiguration)
.applyMutation(awsClientProperties::applyClientCredentialConfigurations)
.applyMutation(s3FileIOProperties::applyEndpointConfigurations)
.applyMutation(s3FileIOProperties::applyS3CrtConfigurations)
.build();
}
return S3AsyncClient.builder().applyMutation(this::applyAssumeRoleConfigurations).build();
return S3AsyncClient.builder()
.applyMutation(this::applyAssumeRoleConfigurations)
.applyMutation(awsClientProperties::applyClientRegionConfiguration)
.applyMutation(awsClientProperties::applyClientCredentialConfigurations)
.applyMutation(s3FileIOProperties::applyEndpointConfigurations)
.build();
}

@Override
Expand Down
13 changes: 11 additions & 2 deletions aws/src/main/java/org/apache/iceberg/aws/AwsClientFactories.java
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,18 @@ public S3Client s3() {
@Override
public S3AsyncClient s3Async() {
if (s3FileIOProperties.isS3CRTEnabled()) {
return S3AsyncClient.crtBuilder().build();
return S3AsyncClient.crtBuilder()
.applyMutation(awsClientProperties::applyClientRegionConfiguration)
.applyMutation(awsClientProperties::applyClientCredentialConfigurations)
.applyMutation(s3FileIOProperties::applyEndpointConfigurations)
.applyMutation(s3FileIOProperties::applyS3CrtConfigurations)
.build();
}
return S3AsyncClient.builder().build();
return S3AsyncClient.builder()
.applyMutation(awsClientProperties::applyClientRegionConfiguration)
.applyMutation(awsClientProperties::applyClientCredentialConfigurations)
.applyMutation(s3FileIOProperties::applyEndpointConfigurations)
.build();
}

@Override
Expand Down
31 changes: 31 additions & 0 deletions aws/src/main/java/org/apache/iceberg/aws/AwsClientProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.retry.RetryMode;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3CrtAsyncClientBuilder;

public class AwsClientProperties implements Serializable {
/**
Expand Down Expand Up @@ -135,6 +136,21 @@ public <T extends AwsClientBuilder> void applyClientRegionConfiguration(T builde
}
}

/**
* Configure a CRT client region.
*
* <p>Sample usage:
*
* <pre>
* S3AsyncClient.crtBuilder().applyMutation(awsClientProperties::applyClientRegionConfiguration)
* </pre>
*/
public <T extends S3CrtAsyncClientBuilder> void applyClientRegionConfiguration(T builder) {
if (clientRegion != null) {
builder.region(Region.of(clientRegion));
}
}

/**
* Configure the credential provider for AWS clients.
*
Expand All @@ -150,6 +166,21 @@ public <T extends AwsClientBuilder> void applyClientCredentialConfigurations(T b
}
}

/**
* Configure the credential provider for CRT clients.
*
* <p>Sample usage:
*
* <pre>
* S3AsyncClient.crtBuilder().applyMutation(awsClientProperties::applyClientCredentialConfigurations)
* </pre>
*/
public <T extends S3CrtAsyncClientBuilder> void applyClientCredentialConfigurations(T builder) {
if (!Strings.isNullOrEmpty(this.clientCredentialsProvider)) {
builder.credentialsProvider(credentialsProvider(this.clientCredentialsProvider));
}
}

/**
* Returns a credentials provider instance. If {@link #refreshCredentialsEndpoint} is set, an
* instance of {@link VendedCredentialsProvider} is returned. If params were set, we return a new
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,17 @@ public S3Client s3() {
@Override
public S3AsyncClient s3Async() {
if (s3FileIOProperties.isS3CRTEnabled()) {
return S3AsyncClient.crtBuilder().build();
return S3AsyncClient.crtBuilder()
.applyMutation(awsClientProperties::applyClientRegionConfiguration)
.applyMutation(awsClientProperties::applyClientCredentialConfigurations)
.applyMutation(s3FileIOProperties::applyEndpointConfigurations)
.applyMutation(s3FileIOProperties::applyS3CrtConfigurations)
.build();
}
return S3AsyncClient.builder().build();
return S3AsyncClient.builder()
.applyMutation(awsClientProperties::applyClientRegionConfiguration)
.applyMutation(awsClientProperties::applyClientCredentialConfigurations)
.applyMutation(s3FileIOProperties::applyEndpointConfigurations)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@
import software.amazon.awssdk.core.retry.conditions.RetryCondition;
import software.amazon.awssdk.core.retry.conditions.RetryOnExceptionsCondition;
import software.amazon.awssdk.core.retry.conditions.TokenBucketRetryCondition;
import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
import software.amazon.awssdk.services.s3.S3ClientBuilder;
import software.amazon.awssdk.services.s3.S3Configuration;
import software.amazon.awssdk.services.s3.S3CrtAsyncClientBuilder;
import software.amazon.awssdk.services.s3.model.ObjectCannedACL;
import software.amazon.awssdk.services.s3.model.Tag;

Expand Down Expand Up @@ -98,6 +100,17 @@ public class S3FileIOProperties implements Serializable {

public static final boolean S3_CRT_ENABLED_DEFAULT = true;

/** This property is used to specify the max concurrency for CRT Async clients. */
public static final String S3_CRT_MAX_CONCURRENCY = "s3.crt.max-concurrency";

/**
* To fully benefit from the analytics-accelerator-s3 library where this S3 CRT client is used, it
* is recommended to initialize with higher concurrency.
*
* <p>For more details, see: https://github.com/awslabs/analytics-accelerator-s3
*/
public static final int S3_CRT_MAX_CONCURRENCY_DEFAULT = 500;

/**
* The fallback-to-iam property allows users to customize whether or not they would like their
* jobs fall back to the Job Execution IAM role in case they get an Access Denied from the S3
Expand Down Expand Up @@ -513,6 +526,7 @@ public class S3FileIOProperties implements Serializable {
private final boolean isS3AnalyticsAcceleratorEnabled;
private final Map<String, String> s3AnalyticsacceleratorProperties;
private final boolean isS3CRTEnabled;
private final int s3CrtMaxConcurrency;
private String writeStorageClass;
private int s3RetryNumRetries;
private long s3RetryMinWaitMs;
Expand Down Expand Up @@ -560,6 +574,7 @@ public S3FileIOProperties() {
this.isS3AnalyticsAcceleratorEnabled = S3_ANALYTICS_ACCELERATOR_ENABLED_DEFAULT;
this.s3AnalyticsacceleratorProperties = Maps.newHashMap();
this.isS3CRTEnabled = S3_CRT_ENABLED_DEFAULT;
this.s3CrtMaxConcurrency = S3_CRT_MAX_CONCURRENCY_DEFAULT;
this.allProperties = Maps.newHashMap();

ValidationException.check(
Expand Down Expand Up @@ -679,6 +694,9 @@ public S3FileIOProperties(Map<String, String> properties) {
PropertyUtil.propertiesWithPrefix(properties, S3_ANALYTICS_ACCELERATOR_PROPERTIES_PREFIX);
this.isS3CRTEnabled =
PropertyUtil.propertyAsBoolean(properties, S3_CRT_ENABLED, S3_CRT_ENABLED_DEFAULT);
this.s3CrtMaxConcurrency =
PropertyUtil.propertyAsInt(
properties, S3_CRT_MAX_CONCURRENCY, S3_CRT_MAX_CONCURRENCY_DEFAULT);

ValidationException.check(
keyIdAccessKeyBothConfigured(),
Expand Down Expand Up @@ -805,6 +823,10 @@ public boolean isS3CRTEnabled() {
return isS3CRTEnabled;
}

public int s3CrtMaxConcurrency() {
return s3CrtMaxConcurrency;
}

public String endpoint() {
return this.endpoint;
}
Expand Down Expand Up @@ -1004,6 +1026,36 @@ public <T extends S3ClientBuilder> void applyEndpointConfigurations(T builder) {
}
}

/**
* Override the endpoint for an S3 async client
*
* <p>Sample usage:
*
* <pre>
* S3AsyncClient.builder().applyMutation(s3FileIOProperties::applyEndpointConfigurations)
* </pre>
*/
public <T extends S3AsyncClientBuilder> void applyEndpointConfigurations(T builder) {
if (endpoint != null) {
builder.endpointOverride(URI.create(endpoint));
}
}

/**
* Override the endpoint for an S3 CRT async client
*
* <p>Sample usage:
*
* <pre>
* S3AsyncClient.crtBuilder().applyMutation(s3FileIOProperties::applyEndpointConfigurations)
* </pre>
*/
public <T extends S3CrtAsyncClientBuilder> void applyEndpointConfigurations(T builder) {
if (endpoint != null) {
builder.endpointOverride(URI.create(endpoint));
}
}

/**
* Override the retry configurations for an S3 client.
*
Expand Down Expand Up @@ -1098,6 +1150,10 @@ public <T extends S3ClientBuilder> void applyUserAgentConfigurations(T builder)
.build());
}

public S3CrtAsyncClientBuilder applyS3CrtConfigurations(S3CrtAsyncClientBuilder builder) {
return builder.maxConcurrency(s3CrtMaxConcurrency());
}

/**
* Dynamically load the http client builder to avoid runtime deps requirements of any optional SDK
* Plugins
Expand Down
75 changes: 75 additions & 0 deletions docs/docs/aws.md
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,81 @@ spark-sql --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCata

For more details on using S3 Acceleration, please refer to [Configuring fast, secure file transfers using Amazon S3 Transfer Acceleration](https://docs.aws.amazon.com/AmazonS3/latest/userguide/transfer-acceleration.html).

### S3 Analytics Accelerator

The [Analytics Accelerator Library for Amazon S3](https://github.com/awslabs/analytics-accelerator-s3) helps you accelerate access to Amazon S3 data from your applications. This open-source solution reduces processing times and compute costs for your data analytics workloads.

In order to enable S3 Analytics Accelerator Library to work in Iceberg, you can set the `s3.analytics-accelerator.enabled` catalog property to `true`. By default, this property is set to `false`.

For example, to use S3 Analytics Accelerator with Spark, you can start the Spark SQL shell with:
```
spark-sql --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.my_catalog.warehouse=s3://my-bucket2/my/key/prefix \
--conf spark.sql.catalog.my_catalog.type=glue \
--conf spark.sql.catalog.my_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO \
--conf spark.sql.catalog.my_catalog.s3.analytics-accelerator.enabled=true
```

The Analytics Accelerator Library can work with either the [S3 CRT client](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/crt-based-s3-client.html) or the [S3AsyncClient](https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/s3/S3AsyncClient.html). The library recommends that you use the S3 CRT client due to its enhanced connection pool management and [higher throughput on downloads](https://aws.amazon.com/blogs/developer/introducing-crt-based-s3-client-and-the-s3-transfer-manager-in-the-aws-sdk-for-java-2-x/).

##### Client Configuration

| Property | Default | Description |
|------------------------|---------|--------------------------------------------------------------|
| s3.crt.enabled | `true` | Controls if the S3 Async clients should be created using CRT |
| s3.crt.max-concurrency | `500` | Max concurrency for S3 CRT Async clients |

Additional library specific configurations are organized into the following sections:

##### Logical IO Configuration

| Property | Default | Description |
|------------------------------------------------------------------------|-----------------------|----------------------------------------------------------------------------|
| s3.analytics-accelerator.logicalio.prefetch.footer.enabled | `true` | Controls whether footer prefetching is enabled |
| s3.analytics-accelerator.logicalio.prefetch.page.index.enabled | `true` | Controls whether page index prefetching is enabled |
| s3.analytics-accelerator.logicalio.prefetch.file.metadata.size | `32KB` | Size of metadata to prefetch for regular files |
| s3.analytics-accelerator.logicalio.prefetch.large.file.metadata.size | `1MB` | Size of metadata to prefetch for large files |
| s3.analytics-accelerator.logicalio.prefetch.file.page.index.size | `1MB` | Size of page index to prefetch for regular files |
| s3.analytics-accelerator.logicalio.prefetch.large.file.page.index.size | `8MB` | Size of page index to prefetch for large files |
| s3.analytics-accelerator.logicalio.large.file.size | `1GB` | Threshold to consider a file as large |
| s3.analytics-accelerator.logicalio.small.objects.prefetching.enabled | `true` | Controls prefetching for small objects |
| s3.analytics-accelerator.logicalio.small.object.size.threshold | `3MB` | Size threshold for small object prefetching |
| s3.analytics-accelerator.logicalio.parquet.metadata.store.size | `45` | Size of the parquet metadata store |
| s3.analytics-accelerator.logicalio.max.column.access.store.size | `15` | Maximum size of column access store |
| s3.analytics-accelerator.logicalio.parquet.format.selector.regex | `^.*.(parquet\|par)$` | Regex pattern to identify parquet files |
| s3.analytics-accelerator.logicalio.prefetching.mode | `ROW_GROUP` | Prefetching mode (valid values: `OFF`, `ALL`, `ROW_GROUP`, `COLUMN_BOUND`) |

##### Physical IO Configuration

| Property | Default | Description |
|--------------------------------------------------------------|---------|---------------------------------------------|
| s3.analytics-accelerator.physicalio.metadatastore.capacity | `50` | Capacity of the metadata store |
| s3.analytics-accelerator.physicalio.blocksizebytes | `8MB` | Size of blocks for data transfer |
| s3.analytics-accelerator.physicalio.readaheadbytes | `64KB` | Number of bytes to read ahead |
| s3.analytics-accelerator.physicalio.maxrangesizebytes | `8MB` | Maximum size of range requests |
| s3.analytics-accelerator.physicalio.partsizebytes | `8MB` | Size of individual parts for transfer |
| s3.analytics-accelerator.physicalio.sequentialprefetch.base | `2.0` | Base factor for sequential prefetch sizing |
| s3.analytics-accelerator.physicalio.sequentialprefetch.speed | `1.0` | Speed factor for sequential prefetch growth |

##### Telemetry Configuration

| Property | Default | Description |
|------------------------------------------------------------------------|-------------------------------------|--------------------------------------------------------------------------|
| s3.analytics-accelerator.telemetry.level | `STANDARD` | Telemetry detail level (valid values: `CRITICAL`, `STANDARD`, `VERBOSE`) |
| s3.analytics-accelerator.telemetry.std.out.enabled | `false` | Enable stdout telemetry output |
| s3.analytics-accelerator.telemetry.logging.enabled | `true` | Enable logging telemetry output |
| s3.analytics-accelerator.telemetry.aggregations.enabled | `false` | Enable telemetry aggregations |
| s3.analytics-accelerator.telemetry.aggregations.flush.interval.seconds | `-1` | Interval to flush aggregated telemetry |
| s3.analytics-accelerator.telemetry.logging.level | `INFO` | Log level for telemetry |
| s3.analytics-accelerator.telemetry.logging.name | `com.amazon.connector.s3.telemetry` | Logger name for telemetry |
| s3.analytics-accelerator.telemetry.format | `default` | Telemetry output format (valid values: `json`, `default`) |

##### Object Client Configuration

| Property | Default | Description |
|------------------------------------------|---------|----------------------------------------------------------------|
| s3.analytics-accelerator.useragentprefix | `null` | Custom prefix to add to the `User-Agent` string in S3 requests |

### S3 Dual-stack

[S3 Dual-stack](https://docs.aws.amazon.com/AmazonS3/latest/userguide/dual-stack-endpoints.html) allows a client to access an S3 bucket through a dual-stack endpoint.
Expand Down