Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public S3Client s3() {
.applyMutation(s3FileIOProperties::applyServiceConfigurations)
.applyMutation(s3FileIOProperties::applySignerConfiguration)
.applyMutation(s3FileIOProperties::applyRetryConfigurations)
.applyMutation(s3FileIOProperties::applyRequesterPaysConfiguration)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ public S3Client s3() {
.applyMutation(s3FileIOProperties::applyS3AccessGrantsConfigurations)
.applyMutation(s3FileIOProperties::applyUserAgentConfigurations)
.applyMutation(s3FileIOProperties::applyRetryConfigurations)
.applyMutation(s3FileIOProperties::applyRequesterPaysConfiguration)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public S3Client s3() {
.applyMutation(s3FileIOProperties::applyS3AccessGrantsConfigurations)
.applyMutation(s3FileIOProperties::applyUserAgentConfigurations)
.applyMutation(s3FileIOProperties::applyRetryConfigurations)
.applyMutation(s3FileIOProperties::applyRequesterPaysConfiguration)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,19 @@ public class S3FileIOProperties implements Serializable {

public static final boolean S3_DIRECTORY_BUCKET_LIST_PREFIX_AS_DIRECTORY_DEFAULT = true;

/**
* Determines if requester acknowledges transfer of billing cost to them, default to false.
*
* <p>For more details, see
* https://docs.aws.amazon.com/AmazonS3/latest/userguide/RequesterPaysBuckets.html
*/
public static final String REQUESTER_PAYS_ENABLED = "s3.requester-pays-enabled";

public static final boolean REQUESTER_PAYS_ENABLED_DEFAULT = false;

private static final String REQUESTER_PAYS_HEADER = "x-amz-request-payer";
private static final String REQUESTER_PAYS_HEADER_VALUE = "requester";

private String sseType;
private String sseKey;
private String sseMd5;
Expand Down Expand Up @@ -488,7 +501,7 @@ public class S3FileIOProperties implements Serializable {
private int s3RetryNumRetries;
private long s3RetryMinWaitMs;
private long s3RetryMaxWaitMs;

private final boolean isRequesterPaysEnabled;
private boolean s3DirectoryBucketListPrefixAsDirectory;
private final Map<String, String> allProperties;

Expand Down Expand Up @@ -523,6 +536,7 @@ public S3FileIOProperties() {
this.isRemoteSigningEnabled = REMOTE_SIGNING_ENABLED_DEFAULT;
this.isS3AccessGrantsEnabled = S3_ACCESS_GRANTS_ENABLED_DEFAULT;
this.isS3AccessGrantsFallbackToIamEnabled = S3_ACCESS_GRANTS_FALLBACK_TO_IAM_ENABLED_DEFAULT;
this.isRequesterPaysEnabled = REQUESTER_PAYS_ENABLED_DEFAULT;
this.s3RetryNumRetries = S3_RETRY_NUM_RETRIES_DEFAULT;
this.s3RetryMinWaitMs = S3_RETRY_MIN_WAIT_MS_DEFAULT;
this.s3RetryMaxWaitMs = S3_RETRY_MAX_WAIT_MS_DEFAULT;
Expand Down Expand Up @@ -566,6 +580,9 @@ public S3FileIOProperties(Map<String, String> properties) {
this.isCrossRegionAccessEnabled =
PropertyUtil.propertyAsBoolean(
properties, CROSS_REGION_ACCESS_ENABLED, CROSS_REGION_ACCESS_ENABLED_DEFAULT);
this.isRequesterPaysEnabled =
PropertyUtil.propertyAsBoolean(
properties, REQUESTER_PAYS_ENABLED, REQUESTER_PAYS_ENABLED_DEFAULT);
try {
this.multiPartSize =
PropertyUtil.propertyAsInt(properties, MULTIPART_SIZE, MULTIPART_SIZE_DEFAULT);
Expand Down Expand Up @@ -1067,4 +1084,19 @@ private <T> T loadSdkPluginConfigurations(String impl, Map<String, String> prope
e);
}
}

public boolean isRequesterPaysEnabled() {
return isRequesterPaysEnabled;
}

public <T extends S3ClientBuilder> void applyRequesterPaysConfiguration(T builder) {
if (isRequesterPaysEnabled) {
ClientOverrideConfiguration.Builder configBuilder =
null != builder.overrideConfiguration()
? builder.overrideConfiguration().toBuilder()
: ClientOverrideConfiguration.builder();
builder.overrideConfiguration(
configBuilder.putHeader(REQUESTER_PAYS_HEADER, REQUESTER_PAYS_HEADER_VALUE).build());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ public void testS3FileIOPropertiesDefaultValues() {
assertThat(S3FileIOProperties.DELETE_ENABLED_DEFAULT)
.isEqualTo(s3FileIOProperties.isDeleteEnabled());

assertThat(S3FileIOProperties.REQUESTER_PAYS_ENABLED_DEFAULT)
.isEqualTo(s3FileIOProperties.isRequesterPaysEnabled());

assertThat(Collections.emptyMap()).isEqualTo(s3FileIOProperties.bucketToAccessPointMapping());
}

Expand Down Expand Up @@ -264,6 +267,11 @@ public void testS3FileIOProperties() {
S3FileIOProperties.REMOTE_SIGNING_ENABLED,
String.valueOf(s3FileIOProperties.isRemoteSigningEnabled()));

assertThat(map)
.containsEntry(
S3FileIOProperties.REQUESTER_PAYS_ENABLED,
String.valueOf(s3FileIOProperties.isRequesterPaysEnabled()));

assertThat(map).containsEntry(S3FileIOProperties.WRITE_STORAGE_CLASS, "INTELLIGENT_TIERING");
}

Expand Down Expand Up @@ -412,6 +420,7 @@ private Map<String, String> getTestProperties() {
map.put(S3FileIOProperties.PRELOAD_CLIENT_ENABLED, "true");
map.put(S3FileIOProperties.REMOTE_SIGNING_ENABLED, "true");
map.put(S3FileIOProperties.WRITE_STORAGE_CLASS, "INTELLIGENT_TIERING");
map.put(S3FileIOProperties.REQUESTER_PAYS_ENABLED, "true");
return map;
}

Expand Down Expand Up @@ -517,4 +526,18 @@ public void testApplyRetryConfiguration() {
RetryPolicy retryPolicy = builder.overrideConfiguration().retryPolicy().get();
assertThat(retryPolicy.numRetries()).as("retries was not set").isEqualTo(999);
}

@Test
public void testApplyRequesterPaysConfiguration() {
Map<String, String> properties =
ImmutableMap.of(S3FileIOProperties.REQUESTER_PAYS_ENABLED, "true");
S3FileIOProperties s3FileIOProperties = new S3FileIOProperties(properties);
S3ClientBuilder mockS3ClientBuilder = Mockito.mock(S3ClientBuilder.class);
s3FileIOProperties.applyRequesterPaysConfiguration(mockS3ClientBuilder);
ArgumentCaptor<ClientOverrideConfiguration> argumentCaptor =
ArgumentCaptor.forClass(ClientOverrideConfiguration.class);
Mockito.verify(mockS3ClientBuilder).overrideConfiguration(argumentCaptor.capture());
ClientOverrideConfiguration capturedArgument = argumentCaptor.getValue();
assertThat(capturedArgument.headers().get("x-amz-request-payer")).contains("requester");
}
}