diff --git a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIO.java b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIO.java index d586309a22d6..cdaa67a39b0f 100644 --- a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIO.java +++ b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIO.java @@ -66,7 +66,6 @@ import org.apache.iceberg.io.FileIOParser; import org.apache.iceberg.io.FileInfo; import org.apache.iceberg.io.IOUtil; -import org.apache.iceberg.io.ImmutableStorageCredential; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.io.ResolvingFileIO; @@ -641,89 +640,6 @@ public void noStorageCredentialConfigured() { .isEqualTo("sessionTokenFromProperties"); } - @Test - public void singleStorageCredentialConfigured() { - StorageCredential s3Credential = - ImmutableStorageCredential.builder() - .prefix("s3://custom-uri") - .config( - ImmutableMap.of( - "s3.access-key-id", - "keyIdFromCredential", - "s3.secret-access-key", - "accessKeyFromCredential", - "s3.session-token", - "sessionTokenFromCredential")) - .build(); - - S3FileIO fileIO = new S3FileIO(); - fileIO.setCredentials(ImmutableList.of(s3Credential)); - fileIO.initialize( - ImmutableMap.of( - "s3.access-key-id", - "keyIdFromProperties", - "s3.secret-access-key", - "accessKeyFromProperties", - "s3.session-token", - "sessionTokenFromProperties")); - - ObjectAssert s3FileIOProperties = - assertThat(fileIO) - .extracting("s3FileIOProperties") - .asInstanceOf(InstanceOfAssertFactories.type(S3FileIOProperties.class)); - s3FileIOProperties.extracting(S3FileIOProperties::accessKeyId).isEqualTo("keyIdFromCredential"); - s3FileIOProperties - .extracting(S3FileIOProperties::secretAccessKey) - .isEqualTo("accessKeyFromCredential"); - s3FileIOProperties - .extracting(S3FileIOProperties::sessionToken) - .isEqualTo("sessionTokenFromCredential"); - } - - @Test - public void multipleStorageCredentialsConfigured() { - StorageCredential s3Credential1 = - ImmutableStorageCredential.builder() - .prefix("s3://custom-uri/1") - .config( - ImmutableMap.of( - "s3.access-key-id", - "keyIdFromCredential1", - "s3.secret-access-key", - "accessKeyFromCredential1", - "s3.session-token", - "sessionTokenFromCredential1")) - .build(); - - StorageCredential s3Credential2 = - ImmutableStorageCredential.builder() - .prefix("s3://custom-uri/2") - .config( - ImmutableMap.of( - "s3.access-key-id", - "keyIdFromCredential2", - "s3.secret-access-key", - "accessKeyFromCredential2", - "s3.session-token", - "sessionTokenFromCredential2")) - .build(); - - S3FileIO fileIO = new S3FileIO(); - fileIO.setCredentials(ImmutableList.of(s3Credential1, s3Credential2)); - assertThatThrownBy( - () -> - fileIO.initialize( - ImmutableMap.of( - "s3.access-key-id", - "keyIdFromProperties", - "s3.secret-access-key", - "accessKeyFromProperties", - "s3.session-token", - "sessionTokenFromProperties"))) - .isInstanceOf(IllegalStateException.class) - .hasMessage("Invalid S3 Credentials: only one S3 credential should exist"); - } - private void createRandomObjects(String prefix, int count) { S3URI s3URI = new S3URI(prefix); diff --git a/aws/src/main/java/org/apache/iceberg/aws/AssumeRoleAwsClientFactory.java b/aws/src/main/java/org/apache/iceberg/aws/AssumeRoleAwsClientFactory.java index 7e9e7e6d553e..ac463617a0b9 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/AssumeRoleAwsClientFactory.java +++ b/aws/src/main/java/org/apache/iceberg/aws/AssumeRoleAwsClientFactory.java @@ -52,6 +52,7 @@ public S3Client s3() { .applyMutation(s3FileIOProperties::applyServiceConfigurations) .applyMutation(s3FileIOProperties::applySignerConfiguration) .applyMutation(s3FileIOProperties::applyRetryConfigurations) + .applyMutation(s3FileIOProperties::applyStorageCredentialsInterceptor) .build(); } diff --git a/aws/src/main/java/org/apache/iceberg/aws/AwsClientFactories.java b/aws/src/main/java/org/apache/iceberg/aws/AwsClientFactories.java index 90ccd522fda2..aed88a162c23 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/AwsClientFactories.java +++ b/aws/src/main/java/org/apache/iceberg/aws/AwsClientFactories.java @@ -116,6 +116,7 @@ public S3Client s3() { .applyMutation(s3FileIOProperties::applyS3AccessGrantsConfigurations) .applyMutation(s3FileIOProperties::applyUserAgentConfigurations) .applyMutation(s3FileIOProperties::applyRetryConfigurations) + .applyMutation(s3FileIOProperties::applyStorageCredentialsInterceptor) .build(); } @@ -133,6 +134,7 @@ public S3AsyncClient s3Async() { .applyMutation(awsClientProperties::applyClientRegionConfiguration) .applyMutation(awsClientProperties::applyClientCredentialConfigurations) .applyMutation(s3FileIOProperties::applyEndpointConfigurations) + .applyMutation(s3FileIOProperties::applyStorageCredentialsInterceptor) .build(); } diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3ExecutionInterceptor.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3ExecutionInterceptor.java new file mode 100644 index 000000000000..1c6d9c13e8c9 --- /dev/null +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3ExecutionInterceptor.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.aws.s3; + +import java.time.Instant; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.io.StorageCredential; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import software.amazon.awssdk.core.interceptor.Context; +import software.amazon.awssdk.core.interceptor.ExecutionAttributes; +import software.amazon.awssdk.core.interceptor.ExecutionInterceptor; +import software.amazon.awssdk.http.SdkHttpRequest; + +public class S3ExecutionInterceptor implements ExecutionInterceptor { + private final List storageCredentials; + + public S3ExecutionInterceptor(List storageCredentials) { + Preconditions.checkArgument(null != storageCredentials, "Invalid storage credentials: null"); + this.storageCredentials = storageCredentials; + } + + @Override + public SdkHttpRequest modifyHttpRequest( + Context.ModifyHttpRequest context, ExecutionAttributes executionAttributes) { + if (!storageCredentials.isEmpty()) { + SdkHttpRequest.Builder builder = context.httpRequest().toBuilder(); + String requestPath = context.httpRequest().encodedPath(); + StorageCredential credentialToUse = storageCredentials.get(0); + Map configToUse = null; + + for (StorageCredential storageCredential : storageCredentials) { + if (requestPath.startsWith(storageCredential.prefix()) + && storageCredential.prefix().length() >= credentialToUse.prefix().length()) { + configToUse = storageCredential.config(); + } + } + + if (null != configToUse) { + String accessKeyId = configToUse.get(S3FileIOProperties.ACCESS_KEY_ID); + String secretAccessKey = configToUse.get(S3FileIOProperties.SECRET_ACCESS_KEY); + String sessionToken = configToUse.get(S3FileIOProperties.SESSION_TOKEN); + String tokenExpiresAtMillis = + configToUse.get(S3FileIOProperties.SESSION_TOKEN_EXPIRES_AT_MS); + + if (null != accessKeyId && null != secretAccessKey) { + builder.putHeader("X-Amz-Access-Key", accessKeyId); + builder.putHeader("X-Amz-Secret-Key", secretAccessKey); + if (null != sessionToken) { + builder.putHeader("X-Amz-Security-Token", sessionToken); + } + + if (null != tokenExpiresAtMillis) { + Instant expiresAt = Instant.ofEpochMilli(Long.parseLong(tokenExpiresAtMillis)); + long expiresInSeconds = + Math.max(1, expiresAt.minusMillis(Instant.now().toEpochMilli()).getEpochSecond()); + builder.putHeader("X-Amz-Expires", Long.toString(expiresInSeconds)); + } + } + + return builder.build(); + } + } + + return ExecutionInterceptor.super.modifyHttpRequest(context, executionAttributes); + } +} diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java index 0409db4ff6ca..94f8225d2585 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java @@ -432,21 +432,27 @@ public String getCredential() { @Override public void initialize(Map props) { this.properties = SerializableMap.copyOf(props); - Map propertiesWithCredentials = - ImmutableMap.builder() - .putAll(properties) - .putAll(storageCredentialConfig()) - .buildKeepingLast(); - this.s3FileIOProperties = new S3FileIOProperties(propertiesWithCredentials); + this.s3FileIOProperties = new S3FileIOProperties(properties); this.createStack = PropertyUtil.propertyAsBoolean(props, "init-creation-stacktrace", true) ? Thread.currentThread().getStackTrace() : null; + Map propertiesWithStorageCredentials = + ImmutableMap.builder() + .putAll(properties) + .putAll( + StorageCredential.toMap( + storageCredentials.stream() + .filter(c -> c.prefix().startsWith("s3")) + .collect(Collectors.toList()))) + .build(); + // Do not override s3 client if it was provided if (s3 == null) { - Object clientFactory = S3FileIOAwsClientFactories.initialize(props); + Object clientFactory = + S3FileIOAwsClientFactories.initialize(propertiesWithStorageCredentials); if (clientFactory instanceof S3FileIOAwsClientFactory) { this.s3 = ((S3FileIOAwsClientFactory) clientFactory)::s3; } @@ -463,7 +469,8 @@ public void initialize(Map props) { // Do not override s3Async client if it was provided if (s3Async == null) { - Object clientFactory = S3FileIOAwsClientFactories.initialize(props); + Object clientFactory = + S3FileIOAwsClientFactories.initialize(propertiesWithStorageCredentials); if (clientFactory instanceof S3FileIOAwsClientFactory) { this.s3Async = ((S3FileIOAwsClientFactory) clientFactory)::s3Async; } @@ -575,16 +582,4 @@ public void setCredentials(List credentials) { public List credentials() { return ImmutableList.copyOf(storageCredentials); } - - private Map storageCredentialConfig() { - List s3Credentials = - storageCredentials.stream() - .filter(c -> c.prefix().startsWith("s3")) - .collect(Collectors.toList()); - - Preconditions.checkState( - s3Credentials.size() <= 1, "Invalid S3 Credentials: only one S3 credential should exist"); - - return s3Credentials.isEmpty() ? Map.of() : s3Credentials.get(0).config(); - } } diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java index 7f266f099d1e..b466fd1a9d48 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java @@ -22,6 +22,7 @@ import java.net.URI; import java.time.Duration; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -32,12 +33,14 @@ import org.apache.iceberg.aws.s3.signer.S3V4RestSignerClient; import org.apache.iceberg.common.DynMethods; import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.io.StorageCredential; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.SerializableMap; import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider; +import software.amazon.awssdk.awscore.client.builder.AwsClientBuilder; import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption; import software.amazon.awssdk.core.exception.SdkServiceException; @@ -531,6 +534,7 @@ public class S3FileIOProperties implements Serializable { private int s3RetryNumRetries; private long s3RetryMinWaitMs; private long s3RetryMaxWaitMs; + private List storageCredentials; private boolean s3DirectoryBucketListPrefixAsDirectory; private final Map allProperties; @@ -576,6 +580,7 @@ public S3FileIOProperties() { this.isS3CRTEnabled = S3_CRT_ENABLED_DEFAULT; this.s3CrtMaxConcurrency = S3_CRT_MAX_CONCURRENCY_DEFAULT; this.allProperties = Maps.newHashMap(); + this.storageCredentials = List.of(); ValidationException.check( keyIdAccessKeyBothConfigured(), @@ -698,6 +703,8 @@ public S3FileIOProperties(Map properties) { PropertyUtil.propertyAsInt( properties, S3_CRT_MAX_CONCURRENCY, S3_CRT_MAX_CONCURRENCY_DEFAULT); + this.storageCredentials = StorageCredential.fromMap(properties); + ValidationException.check( keyIdAccessKeyBothConfigured(), "S3 client access key ID and secret access key must be set at the same time"); @@ -1011,6 +1018,19 @@ public void applySignerConfiguration(T builder) { } } + public void applyStorageCredentialsInterceptor(T builder) { + if (!storageCredentials.isEmpty()) { + ClientOverrideConfiguration.Builder configBuilder = + null != builder.overrideConfiguration() + ? builder.overrideConfiguration().toBuilder() + : ClientOverrideConfiguration.builder(); + builder.overrideConfiguration( + configBuilder + .addExecutionInterceptor(new S3ExecutionInterceptor(storageCredentials)) + .build()); + } + } + /** * Override the endpoint for an S3 client. * diff --git a/core/src/main/java/org/apache/iceberg/io/StorageCredential.java b/core/src/main/java/org/apache/iceberg/io/StorageCredential.java index 193a820f9974..f743a89cb661 100644 --- a/core/src/main/java/org/apache/iceberg/io/StorageCredential.java +++ b/core/src/main/java/org/apache/iceberg/io/StorageCredential.java @@ -19,8 +19,14 @@ package org.apache.iceberg.io; import java.io.Serializable; +import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.PropertyUtil; import org.immutables.value.Value; @Value.Immutable @@ -39,4 +45,48 @@ default void validate() { static StorageCredential create(String prefix, Map config) { return ImmutableStorageCredential.builder().prefix(prefix).config(config).build(); } + + static Map toMap(List credentials) { + Preconditions.checkArgument(null != credentials, "Invalid storage credentials: null"); + Map config = Maps.newHashMap(); + for (StorageCredential credential : credentials) { + String key = "storage-credential." + credential.prefix(); + config.put(key, credential.prefix()); + credential + .config() + .forEach( + (k, v) -> { + String innerKey = key + "." + k; + config.put(innerKey, v); + }); + } + + return config; + } + + static List fromMap(Map map) { + Preconditions.checkArgument(null != map, "Invalid storage credentials config: null"); + Map config = PropertyUtil.propertiesWithPrefix(map, "storage-credential."); + List credentials = Lists.newArrayList(); + + Set prefixes = + config.entrySet().stream() + .filter(entry -> entry.getKey().equals(entry.getValue())) + .map(Map.Entry::getKey) + .collect(Collectors.toSet()); + + prefixes.forEach( + prefix -> { + Map configForCredential = + PropertyUtil.propertiesWithPrefix(config, prefix + "."); + Preconditions.checkArgument( + !configForCredential.isEmpty(), + "Invalid storage credential config with prefix %s: null or empty", + prefix); + StorageCredential credential = create(prefix, configForCredential); + credentials.add(credential); + }); + + return credentials; + } } diff --git a/core/src/test/java/org/apache/iceberg/io/TestStorageCredential.java b/core/src/test/java/org/apache/iceberg/io/TestStorageCredential.java index 40e3d62a3c39..7ec5df35b26a 100644 --- a/core/src/test/java/org/apache/iceberg/io/TestStorageCredential.java +++ b/core/src/test/java/org/apache/iceberg/io/TestStorageCredential.java @@ -22,8 +22,11 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.io.IOException; +import java.util.List; import java.util.Map; import org.apache.iceberg.TestHelpers; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.junit.jupiter.api.Test; public class TestStorageCredential { @@ -54,4 +57,53 @@ public void javaSerDe() throws IOException, ClassNotFoundException { StorageCredential.create("randomPrefix", Map.of("token", "storageToken")); assertThat(TestHelpers.roundTripSerialize(credential)).isEqualTo(credential); } + + @Test + public void roundTripFromToMap() { + assertThat(StorageCredential.fromMap(ImmutableMap.of())).isEmpty(); + + List credentials = + ImmutableList.of( + StorageCredential.create( + "s3://bucket1", + Map.of("accessKey", "key1", "secretKey", "secretKey1", "otherKey", "otherVal")), + StorageCredential.create( + "s3://bucket2", + Map.of("accessKey", "key2", "secretKey", "secretKey2", "otherKey", "otherVal")), + StorageCredential.create( + "s3", + Map.of( + "accessKey", + "genericKey", + "secretKey", + "genericSecretKey", + "key1", + "val1", + "key2", + "val2"))); + + assertThat(StorageCredential.fromMap(StorageCredential.toMap(credentials))) + .hasSameElementsAs(credentials); + } + + @Test + public void invalidToMap() { + assertThatThrownBy(() -> StorageCredential.toMap(null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid storage credentials: null"); + } + + @Test + public void invalidFromMap() { + assertThatThrownBy(() -> StorageCredential.fromMap(null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid storage credentials config: null"); + + assertThatThrownBy( + () -> + StorageCredential.fromMap( + ImmutableMap.of("storage-credential.s3://bucket", "s3://bucket"))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid storage credential config with prefix s3://bucket: null or empty"); + } }