diff --git a/aws/src/main/java/org/apache/iceberg/aws/AssumeRoleAwsClientFactory.java b/aws/src/main/java/org/apache/iceberg/aws/AssumeRoleAwsClientFactory.java index 4149d795d30d..610a9a55b82e 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/AssumeRoleAwsClientFactory.java +++ b/aws/src/main/java/org/apache/iceberg/aws/AssumeRoleAwsClientFactory.java @@ -49,6 +49,7 @@ public S3Client s3() { .applyMutation(s3FileIOProperties::applyServiceConfigurations) .applyMutation(s3FileIOProperties::applySignerConfiguration) .applyMutation(s3FileIOProperties::applyRetryConfigurations) + .applyMutation(s3FileIOProperties::applyRequesterPaysConfiguration) .build(); } diff --git a/aws/src/main/java/org/apache/iceberg/aws/AwsClientFactories.java b/aws/src/main/java/org/apache/iceberg/aws/AwsClientFactories.java index 7554b5629be4..9f6dbc1e0d35 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/AwsClientFactories.java +++ b/aws/src/main/java/org/apache/iceberg/aws/AwsClientFactories.java @@ -115,6 +115,7 @@ public S3Client s3() { .applyMutation(s3FileIOProperties::applyS3AccessGrantsConfigurations) .applyMutation(s3FileIOProperties::applyUserAgentConfigurations) .applyMutation(s3FileIOProperties::applyRetryConfigurations) + .applyMutation(s3FileIOProperties::applyRequesterPaysConfiguration) .build(); } diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/DefaultS3FileIOAwsClientFactory.java b/aws/src/main/java/org/apache/iceberg/aws/s3/DefaultS3FileIOAwsClientFactory.java index 8687d737a5d7..73b75f3ee1ad 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/DefaultS3FileIOAwsClientFactory.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/DefaultS3FileIOAwsClientFactory.java @@ -56,6 +56,7 @@ public S3Client s3() { .applyMutation(s3FileIOProperties::applyS3AccessGrantsConfigurations) .applyMutation(s3FileIOProperties::applyUserAgentConfigurations) .applyMutation(s3FileIOProperties::applyRetryConfigurations) + .applyMutation(s3FileIOProperties::applyRequesterPaysConfiguration) .build(); } } diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java index 8d97b9d1bf20..d2ee506cea00 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java @@ -454,6 +454,19 @@ public class S3FileIOProperties implements Serializable { public static final boolean S3_DIRECTORY_BUCKET_LIST_PREFIX_AS_DIRECTORY_DEFAULT = true; + /** + * Determines if requester acknowledges transfer of billing cost to them, default to false. + * + *

For more details, see + * https://docs.aws.amazon.com/AmazonS3/latest/userguide/RequesterPaysBuckets.html + */ + public static final String REQUESTER_PAYS_ENABLED = "s3.requester-pays-enabled"; + + public static final boolean REQUESTER_PAYS_ENABLED_DEFAULT = false; + + private static final String REQUESTER_PAYS_HEADER = "x-amz-request-payer"; + private static final String REQUESTER_PAYS_HEADER_VALUE = "requester"; + private String sseType; private String sseKey; private String sseMd5; @@ -488,7 +501,7 @@ public class S3FileIOProperties implements Serializable { private int s3RetryNumRetries; private long s3RetryMinWaitMs; private long s3RetryMaxWaitMs; - + private final boolean isRequesterPaysEnabled; private boolean s3DirectoryBucketListPrefixAsDirectory; private final Map allProperties; @@ -523,6 +536,7 @@ public S3FileIOProperties() { this.isRemoteSigningEnabled = REMOTE_SIGNING_ENABLED_DEFAULT; this.isS3AccessGrantsEnabled = S3_ACCESS_GRANTS_ENABLED_DEFAULT; this.isS3AccessGrantsFallbackToIamEnabled = S3_ACCESS_GRANTS_FALLBACK_TO_IAM_ENABLED_DEFAULT; + this.isRequesterPaysEnabled = REQUESTER_PAYS_ENABLED_DEFAULT; this.s3RetryNumRetries = S3_RETRY_NUM_RETRIES_DEFAULT; this.s3RetryMinWaitMs = S3_RETRY_MIN_WAIT_MS_DEFAULT; this.s3RetryMaxWaitMs = S3_RETRY_MAX_WAIT_MS_DEFAULT; @@ -566,6 +580,9 @@ public S3FileIOProperties(Map properties) { this.isCrossRegionAccessEnabled = PropertyUtil.propertyAsBoolean( properties, CROSS_REGION_ACCESS_ENABLED, CROSS_REGION_ACCESS_ENABLED_DEFAULT); + this.isRequesterPaysEnabled = + PropertyUtil.propertyAsBoolean( + properties, REQUESTER_PAYS_ENABLED, REQUESTER_PAYS_ENABLED_DEFAULT); try { this.multiPartSize = PropertyUtil.propertyAsInt(properties, MULTIPART_SIZE, MULTIPART_SIZE_DEFAULT); @@ -1067,4 +1084,19 @@ private T loadSdkPluginConfigurations(String impl, Map prope e); } } + + public boolean isRequesterPaysEnabled() { + return isRequesterPaysEnabled; + } + + public void applyRequesterPaysConfiguration(T builder) { + if (isRequesterPaysEnabled) { + ClientOverrideConfiguration.Builder configBuilder = + null != builder.overrideConfiguration() + ? builder.overrideConfiguration().toBuilder() + : ClientOverrideConfiguration.builder(); + builder.overrideConfiguration( + configBuilder.putHeader(REQUESTER_PAYS_HEADER, REQUESTER_PAYS_HEADER_VALUE).build()); + } + } } diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIOProperties.java b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIOProperties.java index 71b931257cf5..1e5018499fba 100644 --- a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIOProperties.java +++ b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIOProperties.java @@ -121,6 +121,9 @@ public void testS3FileIOPropertiesDefaultValues() { assertThat(S3FileIOProperties.DELETE_ENABLED_DEFAULT) .isEqualTo(s3FileIOProperties.isDeleteEnabled()); + assertThat(S3FileIOProperties.REQUESTER_PAYS_ENABLED_DEFAULT) + .isEqualTo(s3FileIOProperties.isRequesterPaysEnabled()); + assertThat(Collections.emptyMap()).isEqualTo(s3FileIOProperties.bucketToAccessPointMapping()); } @@ -264,6 +267,11 @@ public void testS3FileIOProperties() { S3FileIOProperties.REMOTE_SIGNING_ENABLED, String.valueOf(s3FileIOProperties.isRemoteSigningEnabled())); + assertThat(map) + .containsEntry( + S3FileIOProperties.REQUESTER_PAYS_ENABLED, + String.valueOf(s3FileIOProperties.isRequesterPaysEnabled())); + assertThat(map).containsEntry(S3FileIOProperties.WRITE_STORAGE_CLASS, "INTELLIGENT_TIERING"); } @@ -412,6 +420,7 @@ private Map getTestProperties() { map.put(S3FileIOProperties.PRELOAD_CLIENT_ENABLED, "true"); map.put(S3FileIOProperties.REMOTE_SIGNING_ENABLED, "true"); map.put(S3FileIOProperties.WRITE_STORAGE_CLASS, "INTELLIGENT_TIERING"); + map.put(S3FileIOProperties.REQUESTER_PAYS_ENABLED, "true"); return map; } @@ -517,4 +526,18 @@ public void testApplyRetryConfiguration() { RetryPolicy retryPolicy = builder.overrideConfiguration().retryPolicy().get(); assertThat(retryPolicy.numRetries()).as("retries was not set").isEqualTo(999); } + + @Test + public void testApplyRequesterPaysConfiguration() { + Map properties = + ImmutableMap.of(S3FileIOProperties.REQUESTER_PAYS_ENABLED, "true"); + S3FileIOProperties s3FileIOProperties = new S3FileIOProperties(properties); + S3ClientBuilder mockS3ClientBuilder = Mockito.mock(S3ClientBuilder.class); + s3FileIOProperties.applyRequesterPaysConfiguration(mockS3ClientBuilder); + ArgumentCaptor argumentCaptor = + ArgumentCaptor.forClass(ClientOverrideConfiguration.class); + Mockito.verify(mockS3ClientBuilder).overrideConfiguration(argumentCaptor.capture()); + ClientOverrideConfiguration capturedArgument = argumentCaptor.getValue(); + assertThat(capturedArgument.headers().get("x-amz-request-payer")).contains("requester"); + } }