diff --git a/aws/src/integration/java/org/apache/iceberg/aws/AwsIntegTestUtil.java b/aws/src/integration/java/org/apache/iceberg/aws/AwsIntegTestUtil.java index 89b1620d7339..f66d6b33050f 100644 --- a/aws/src/integration/java/org/apache/iceberg/aws/AwsIntegTestUtil.java +++ b/aws/src/integration/java/org/apache/iceberg/aws/AwsIntegTestUtil.java @@ -21,8 +21,11 @@ import java.util.List; import java.util.stream.Collectors; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient; +import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.glue.GlueClient; import software.amazon.awssdk.services.glue.model.DeleteDatabaseRequest; import software.amazon.awssdk.services.s3.S3Client; @@ -31,6 +34,9 @@ import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; import software.amazon.awssdk.services.s3.model.ObjectIdentifier; +import software.amazon.awssdk.services.s3control.S3ControlClient; +import software.amazon.awssdk.services.s3control.model.CreateAccessPointRequest; +import software.amazon.awssdk.services.s3control.model.DeleteAccessPointRequest; public class AwsIntegTestUtil { @@ -39,6 +45,25 @@ public class AwsIntegTestUtil { private AwsIntegTestUtil() { } + /** + * Get the environment variable AWS_REGION to use for testing + * @return region + */ + public static String testRegion() { + return System.getenv("AWS_REGION"); + } + + /** + * Get the environment variable AWS_CROSS_REGION to use for testing + * @return region + */ + public static String testCrossRegion() { + String crossRegion = System.getenv("AWS_CROSS_REGION"); + Preconditions.checkArgument(!testRegion().equals(crossRegion), "AWS_REGION should not be equal to " + + "AWS_CROSS_REGION"); + return crossRegion; + } + /** * Set the environment variable AWS_TEST_BUCKET for a default bucket to use for testing * @return bucket name @@ -47,6 +72,14 @@ public static String testBucketName() { return System.getenv("AWS_TEST_BUCKET"); } + /** + * Set the environment variable AWS_TEST_CROSS_REGION_BUCKET for a default bucket to use for testing + * @return bucket name + */ + public static String testCrossRegionBucketName() { + return System.getenv("AWS_TEST_CROSS_REGION_BUCKET"); + } + /** * Set the environment variable AWS_TEST_ACCOUNT_ID for a default account to use for testing * @return account id @@ -81,4 +114,38 @@ public static void cleanGlueCatalog(GlueClient glue, List namespaces) { } } } + + public static S3ControlClient createS3ControlClient(String region) { + return S3ControlClient.builder() + .httpClientBuilder(UrlConnectionHttpClient.builder()) + .region(Region.of(region)) + .build(); + } + + public static void createAccessPoint(S3ControlClient s3ControlClient, String accessPointName, String bucketName) { + try { + s3ControlClient.createAccessPoint(CreateAccessPointRequest + .builder() + .name(accessPointName) + .bucket(bucketName) + .accountId(testAccountId()) + .build() + ); + } catch (Exception e) { + LOG.error("Cannot create access point {}", accessPointName, e); + } + } + + public static void deleteAccessPoint(S3ControlClient s3ControlClient, String accessPointName) { + try { + s3ControlClient.deleteAccessPoint(DeleteAccessPointRequest + .builder() + .name(accessPointName) + .accountId(testAccountId()) + .build() + ); + } catch (Exception e) { + LOG.error("Cannot delete access point {}", accessPointName, e); + } + } } diff --git a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIOIntegration.java b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIOIntegration.java index fbce29e05056..52dde3394e93 100644 --- a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIOIntegration.java +++ b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIOIntegration.java @@ -38,12 +38,16 @@ import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.junit.jupiter.api.BeforeEach; import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.regions.PartitionMetadata; +import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.kms.KmsClient; import software.amazon.awssdk.services.kms.model.ListAliasesRequest; import software.amazon.awssdk.services.kms.model.ListAliasesResponse; @@ -57,14 +61,21 @@ import software.amazon.awssdk.services.s3.model.Permission; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.model.ServerSideEncryption; +import software.amazon.awssdk.services.s3control.S3ControlClient; +import software.amazon.awssdk.utils.ImmutableMap; import software.amazon.awssdk.utils.IoUtils; public class TestS3FileIOIntegration { private static AwsClientFactory clientFactory; private static S3Client s3; + private static S3ControlClient s3Control; + private static S3ControlClient crossRegionS3Control; private static KmsClient kms; private static String bucketName; + private static String crossRegionBucketName; + private static String accessPointName; + private static String crossRegionAccessPointName; private static String prefix; private static byte[] contentBytes; private static String content; @@ -78,17 +89,27 @@ public static void beforeClass() { clientFactory = AwsClientFactories.defaultFactory(); s3 = clientFactory.s3(); kms = clientFactory.kms(); + s3Control = AwsIntegTestUtil.createS3ControlClient(AwsIntegTestUtil.testRegion()); + crossRegionS3Control = AwsIntegTestUtil.createS3ControlClient(AwsIntegTestUtil.testCrossRegion()); bucketName = AwsIntegTestUtil.testBucketName(); + crossRegionBucketName = AwsIntegTestUtil.testCrossRegionBucketName(); + accessPointName = UUID.randomUUID().toString(); + crossRegionAccessPointName = UUID.randomUUID().toString(); prefix = UUID.randomUUID().toString(); contentBytes = new byte[1024 * 1024 * 10]; deletionBatchSize = 3; content = new String(contentBytes, StandardCharsets.UTF_8); kmsKeyArn = kms.createKey().keyMetadata().arn(); + + AwsIntegTestUtil.createAccessPoint(s3Control, accessPointName, bucketName); + AwsIntegTestUtil.createAccessPoint(crossRegionS3Control, crossRegionAccessPointName, crossRegionBucketName); } @AfterClass public static void afterClass() { AwsIntegTestUtil.cleanS3Bucket(s3, bucketName, prefix); + AwsIntegTestUtil.deleteAccessPoint(s3Control, accessPointName); + AwsIntegTestUtil.deleteAccessPoint(crossRegionS3Control, crossRegionAccessPointName); kms.scheduleKeyDeletion(ScheduleKeyDeletionRequest.builder().keyId(kmsKeyArn).pendingWindowInDays(7).build()); } @@ -98,6 +119,11 @@ public void before() { objectUri = String.format("s3://%s/%s", bucketName, objectKey); } + @BeforeEach + public void beforeEach() { + clientFactory.initialize(Maps.newHashMap()); + } + @Test public void testNewInputStream() throws Exception { s3.putObject(PutObjectRequest.builder().bucket(bucketName).key(objectKey).build(), @@ -106,6 +132,33 @@ public void testNewInputStream() throws Exception { validateRead(s3FileIO); } + @Test + public void testNewInputStreamWithAccessPoint() throws Exception { + s3.putObject(PutObjectRequest.builder().bucket(bucketName).key(objectKey).build(), + RequestBody.fromBytes(contentBytes)); + S3FileIO s3FileIO = new S3FileIO(clientFactory::s3); + s3FileIO.initialize(ImmutableMap.of(AwsProperties.S3_ACCESS_POINTS_PREFIX + bucketName, + testAccessPointARN(AwsIntegTestUtil.testRegion(), accessPointName))); + validateRead(s3FileIO); + } + + @Test + public void testNewInputStreamWithCrossRegionAccessPoint() throws Exception { + clientFactory.initialize(ImmutableMap.of(AwsProperties.S3_USE_ARN_REGION_ENABLED, "true")); + S3Client s3Client = clientFactory.s3(); + s3Client.putObject(PutObjectRequest.builder().bucket(bucketName).key(objectKey).build(), + RequestBody.fromBytes(contentBytes)); + // make a copy in cross-region bucket + s3Client.putObject(PutObjectRequest.builder() + .bucket(testAccessPointARN(AwsIntegTestUtil.testCrossRegion(), crossRegionAccessPointName)) + .key(objectKey).build(), + RequestBody.fromBytes(contentBytes)); + S3FileIO s3FileIO = new S3FileIO(clientFactory::s3); + s3FileIO.initialize(ImmutableMap.of(AwsProperties.S3_ACCESS_POINTS_PREFIX + bucketName, + testAccessPointARN(AwsIntegTestUtil.testCrossRegion(), crossRegionAccessPointName))); + validateRead(s3FileIO); + } + @Test public void testNewOutputStream() throws Exception { S3FileIO s3FileIO = new S3FileIO(clientFactory::s3); @@ -116,6 +169,34 @@ public void testNewOutputStream() throws Exception { Assert.assertEquals(content, result); } + @Test + public void testNewOutputStreamWithAccessPoint() throws Exception { + S3FileIO s3FileIO = new S3FileIO(clientFactory::s3); + s3FileIO.initialize(ImmutableMap.of(AwsProperties.S3_ACCESS_POINTS_PREFIX + bucketName, + testAccessPointARN(AwsIntegTestUtil.testRegion(), accessPointName))); + write(s3FileIO); + InputStream stream = s3.getObject(GetObjectRequest.builder().bucket(bucketName).key(objectKey).build()); + String result = IoUtils.toUtf8String(stream); + stream.close(); + Assert.assertEquals(content, result); + } + + @Test + public void testNewOutputStreamWithCrossRegionAccessPoint() throws Exception { + clientFactory.initialize(ImmutableMap.of(AwsProperties.S3_USE_ARN_REGION_ENABLED, "true")); + S3Client s3Client = clientFactory.s3(); + S3FileIO s3FileIO = new S3FileIO(clientFactory::s3); + s3FileIO.initialize(ImmutableMap.of(AwsProperties.S3_ACCESS_POINTS_PREFIX + bucketName, + testAccessPointARN(AwsIntegTestUtil.testCrossRegion(), crossRegionAccessPointName))); + write(s3FileIO); + InputStream stream = s3Client.getObject(GetObjectRequest.builder() + .bucket(testAccessPointARN(AwsIntegTestUtil.testCrossRegion(), crossRegionAccessPointName)) + .key(objectKey).build()); + String result = IoUtils.toUtf8String(stream); + stream.close(); + Assert.assertEquals(content, result); + } + @Test public void testServerSideS3Encryption() throws Exception { AwsProperties properties = new AwsProperties(); @@ -218,6 +299,23 @@ public void testDeleteFilesMultipleBatches() throws Exception { testDeleteFiles(deletionBatchSize * 2, s3FileIO); } + @Test + public void testDeleteFilesMultipleBatchesWithAccessPoints() throws Exception { + S3FileIO s3FileIO = new S3FileIO(clientFactory::s3, getDeletionTestProperties()); + s3FileIO.initialize(ImmutableMap.of(AwsProperties.S3_ACCESS_POINTS_PREFIX + bucketName, + testAccessPointARN(AwsIntegTestUtil.testRegion(), accessPointName))); + testDeleteFiles(deletionBatchSize * 2, s3FileIO); + } + + @Test + public void testDeleteFilesMultipleBatchesWithCrossRegionAccessPoints() throws Exception { + clientFactory.initialize(ImmutableMap.of(AwsProperties.S3_USE_ARN_REGION_ENABLED, "true")); + S3FileIO s3FileIO = new S3FileIO(clientFactory::s3, getDeletionTestProperties()); + s3FileIO.initialize(ImmutableMap.of(AwsProperties.S3_ACCESS_POINTS_PREFIX + bucketName, + testAccessPointARN(AwsIntegTestUtil.testCrossRegion(), crossRegionAccessPointName))); + testDeleteFiles(deletionBatchSize * 2, s3FileIO); + } + @Test public void testDeleteFilesLessThanBatchSize() throws Exception { S3FileIO s3FileIO = new S3FileIO(clientFactory::s3, getDeletionTestProperties()); @@ -268,4 +366,10 @@ private void validateRead(S3FileIO s3FileIO) throws Exception { stream.close(); Assert.assertEquals(content, result); } + + private String testAccessPointARN(String region, String accessPoint) { + // format: arn:aws:s3:region:account-id:accesspoint/resource + return String.format("arn:%s:s3:%s:%s:accesspoint/%s", + PartitionMetadata.of(Region.of(region)).id(), region, AwsIntegTestUtil.testAccountId(), accessPoint); + } } diff --git a/aws/src/main/java/org/apache/iceberg/aws/AssumeRoleAwsClientFactory.java b/aws/src/main/java/org/apache/iceberg/aws/AssumeRoleAwsClientFactory.java index 056dd1c52549..58ba2e259f9e 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/AssumeRoleAwsClientFactory.java +++ b/aws/src/main/java/org/apache/iceberg/aws/AssumeRoleAwsClientFactory.java @@ -45,6 +45,7 @@ public class AssumeRoleAwsClientFactory implements AwsClientFactory { private int timeout; private String region; private String s3Endpoint; + private boolean s3UseArnRegionEnabled; private String httpClientType; @Override @@ -52,6 +53,7 @@ public S3Client s3() { return S3Client.builder() .applyMutation(this::configure) .applyMutation(builder -> AwsClientFactories.configureEndpoint(builder, s3Endpoint)) + .serviceConfiguration(s -> s.useArnRegionEnabled(s3UseArnRegionEnabled).build()) .build(); } @@ -84,6 +86,8 @@ public void initialize(Map properties) { this.s3Endpoint = properties.get(AwsProperties.S3FILEIO_ENDPOINT); this.tags = toTags(properties); + this.s3UseArnRegionEnabled = PropertyUtil.propertyAsBoolean(properties, AwsProperties.S3_ACCESS_POINTS_PREFIX, + AwsProperties.S3_USE_ARN_REGION_ENABLED_DEFAULT); this.httpClientType = PropertyUtil.propertyAsString(properties, AwsProperties.HTTP_CLIENT_TYPE, AwsProperties.HTTP_CLIENT_TYPE_DEFAULT); } @@ -125,6 +129,10 @@ protected String httpClientType() { return httpClientType; } + protected boolean s3UseArnRegionEnabled() { + return s3UseArnRegionEnabled; + } + private StsClient sts() { return StsClient.builder() .httpClientBuilder(AwsClientFactories.configureHttpClientBuilder(httpClientType)) diff --git a/aws/src/main/java/org/apache/iceberg/aws/AwsClientFactories.java b/aws/src/main/java/org/apache/iceberg/aws/AwsClientFactories.java index 1baec27d1627..da7617072319 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/AwsClientFactories.java +++ b/aws/src/main/java/org/apache/iceberg/aws/AwsClientFactories.java @@ -83,6 +83,7 @@ static class DefaultAwsClientFactory implements AwsClientFactory { private String s3AccessKeyId; private String s3SecretAccessKey; private String s3SessionToken; + private Boolean s3UseArnRegionEnabled; private String httpClientType; DefaultAwsClientFactory() { @@ -93,6 +94,7 @@ public S3Client s3() { return S3Client.builder() .httpClientBuilder(configureHttpClientBuilder(httpClientType)) .applyMutation(builder -> configureEndpoint(builder, s3Endpoint)) + .serviceConfiguration(s -> s.useArnRegionEnabled(s3UseArnRegionEnabled).build()) .credentialsProvider(credentialsProvider(s3AccessKeyId, s3SecretAccessKey, s3SessionToken)) .build(); } @@ -118,6 +120,8 @@ public void initialize(Map properties) { this.s3AccessKeyId = properties.get(AwsProperties.S3FILEIO_ACCESS_KEY_ID); this.s3SecretAccessKey = properties.get(AwsProperties.S3FILEIO_SECRET_ACCESS_KEY); this.s3SessionToken = properties.get(AwsProperties.S3FILEIO_SESSION_TOKEN); + this.s3UseArnRegionEnabled = PropertyUtil.propertyAsBoolean(properties, AwsProperties.S3_USE_ARN_REGION_ENABLED, + AwsProperties.S3_USE_ARN_REGION_ENABLED_DEFAULT); ValidationException.check((s3AccessKeyId == null && s3SecretAccessKey == null) || (s3AccessKeyId != null && s3SecretAccessKey != null), diff --git a/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java b/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java index b576f5180b05..40eff777587d 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java +++ b/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java @@ -27,6 +27,7 @@ import org.apache.iceberg.aws.lakeformation.LakeFormationAwsClientFactory; import org.apache.iceberg.aws.s3.S3FileIO; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.util.PropertyUtil; import software.amazon.awssdk.services.s3.model.ObjectCannedACL; @@ -185,6 +186,17 @@ public class AwsProperties implements Serializable { */ public static final String S3FILEIO_SESSION_TOKEN = "s3.session-token"; + /** + * Enable to make S3FileIO, to make cross-region call to the region specified in the ARN of an access point. + *

+ * By default, attempting to use an access point in a different region will throw an exception. + * When enabled, this property allows using access points in other regions. + *

+ * For more details see: https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/s3/S3Configuration.html#useArnRegionEnabled-- + */ + public static final String S3_USE_ARN_REGION_ENABLED = "s3.use-arn-region-enabled"; + public static final boolean S3_USE_ARN_REGION_ENABLED_DEFAULT = false; + /** * Enables eTag checks for S3 PUT and MULTIPART upload requests. */ @@ -291,6 +303,16 @@ public class AwsProperties implements Serializable { */ public static final String S3_WRITE_TAGS_PREFIX = "s3.write.tags."; + /** + * Used by {@link S3FileIO}, prefix used for bucket access point configuration. + * To set, we can pass a catalog property. + *

+ * For more details, see https://aws.amazon.com/s3/features/access-points/ + *

+ * Example: s3.access-points.my-bucket=access-point + */ + public static final String S3_ACCESS_POINTS_PREFIX = "s3.access-points."; + /** * @deprecated will be removed at 0.15.0, please use {@link #S3_CHECKSUM_ENABLED_DEFAULT} instead */ @@ -320,6 +342,7 @@ public class AwsProperties implements Serializable { private ObjectCannedACL s3FileIoAcl; private boolean isS3ChecksumEnabled; private final Set s3WriteTags; + private final Map s3BucketToAccessPointMapping; private String glueCatalogId; private boolean glueCatalogSkipArchive; @@ -340,6 +363,7 @@ public AwsProperties() { this.s3fileIoStagingDirectory = System.getProperty("java.io.tmpdir"); this.isS3ChecksumEnabled = S3_CHECKSUM_ENABLED_DEFAULT; this.s3WriteTags = Sets.newHashSet(); + this.s3BucketToAccessPointMapping = ImmutableMap.of(); this.glueCatalogId = null; this.glueCatalogSkipArchive = GLUE_CATALOG_SKIP_ARCHIVE_DEFAULT; @@ -403,6 +427,7 @@ public AwsProperties(Map properties) { String.format("Deletion batch size must be between 1 and %s", S3FILEIO_DELETE_BATCH_SIZE_MAX)); this.s3WriteTags = toTags(properties, S3_WRITE_TAGS_PREFIX); + this.s3BucketToAccessPointMapping = PropertyUtil.propertiesWithPrefix(properties, S3_ACCESS_POINTS_PREFIX); this.dynamoDbTableName = PropertyUtil.propertyAsString(properties, DYNAMODB_TABLE_NAME, DYNAMODB_TABLE_NAME_DEFAULT); @@ -530,4 +555,8 @@ private Set toTags(Map properties, String prefix) { .map(e -> Tag.builder().key(e.getKey()).value(e.getValue()).build()) .collect(Collectors.toSet()); } + + public Map s3BucketToAccessPointMapping() { + return s3BucketToAccessPointMapping; + } } diff --git a/aws/src/main/java/org/apache/iceberg/aws/lakeformation/LakeFormationAwsClientFactory.java b/aws/src/main/java/org/apache/iceberg/aws/lakeformation/LakeFormationAwsClientFactory.java index 975bbdc76d88..e4de99f530a6 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/lakeformation/LakeFormationAwsClientFactory.java +++ b/aws/src/main/java/org/apache/iceberg/aws/lakeformation/LakeFormationAwsClientFactory.java @@ -81,6 +81,7 @@ public S3Client s3() { .httpClientBuilder(AwsClientFactories.configureHttpClientBuilder(httpClientType())) .applyMutation(builder -> AwsClientFactories.configureEndpoint(builder, s3Endpoint())) .credentialsProvider(new LakeFormationCredentialsProvider(lakeFormation(), buildTableArn())) + .serviceConfiguration(s -> s.useArnRegionEnabled(s3UseArnRegionEnabled()).build()) .region(Region.of(region())) .build(); } else { diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java index 9aea488f613a..e1ce98702e15 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java @@ -108,7 +108,7 @@ public OutputFile newOutputFile(String path) { @Override public void deleteFile(String path) { - S3URI location = new S3URI(path); + S3URI location = new S3URI(path, awsProperties.s3BucketToAccessPointMapping()); DeleteObjectRequest deleteRequest = DeleteObjectRequest.builder().bucket(location.bucket()).key(location.key()).build(); @@ -128,7 +128,7 @@ public void deleteFiles(Iterable paths) throws BulkDeletionFailureExcept SetMultimap bucketToObjects = Multimaps.newSetMultimap(Maps.newHashMap(), Sets::newHashSet); int numberOfFailedDeletions = 0; for (String path : paths) { - S3URI location = new S3URI(path); + S3URI location = new S3URI(path, awsProperties.s3BucketToAccessPointMapping()); String bucket = location.bucket(); String objectKey = location.key(); Set objectsInBucket = bucketToObjects.get(bucket); diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputFile.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputFile.java index 0ca69aad194d..a7acdcaa11f5 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputFile.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputFile.java @@ -28,7 +28,8 @@ public class S3InputFile extends BaseS3File implements InputFile { public static S3InputFile fromLocation(String location, S3Client client, AwsProperties awsProperties, MetricsContext metrics) { - return new S3InputFile(client, new S3URI(location), awsProperties, metrics); + return new S3InputFile(client, new S3URI(location, awsProperties.s3BucketToAccessPointMapping()), + awsProperties, metrics); } S3InputFile(S3Client client, S3URI uri, AwsProperties awsProperties, MetricsContext metrics) { diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputFile.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputFile.java index 48b2ee4f0cae..2becc35a263f 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputFile.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputFile.java @@ -32,7 +32,8 @@ public class S3OutputFile extends BaseS3File implements OutputFile { public static S3OutputFile fromLocation(String location, S3Client client, AwsProperties awsProperties, MetricsContext metrics) { - return new S3OutputFile(client, new S3URI(location), awsProperties, metrics); + return new S3OutputFile(client, new S3URI(location, awsProperties.s3BucketToAccessPointMapping()), + awsProperties, metrics); } S3OutputFile(S3Client client, S3URI uri, AwsProperties awsProperties, MetricsContext metrics) { diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3URI.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3URI.java index a59976cbf838..3398cbac9ba6 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3URI.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3URI.java @@ -19,14 +19,18 @@ package org.apache.iceberg.aws.s3; +import java.util.Map; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; /** * This class represents a fully qualified location in S3 for input/output - * operations expressed as as URI. This implementation is provided to + * operations expressed as as URI. This implementation is provided to * ensure compatibility with Hadoop Path implementations that may introduce * encoding issues with native URI implementation. + * If the bucket in the location has an access point in the mapping, the + * access point is used to perform all the S3 operations. * * Note: Path-style access is deprecated and not supported by this * implementation. @@ -50,6 +54,20 @@ class S3URI { * @param location fully qualified URI */ S3URI(String location) { + this(location, ImmutableMap.of()); + } + + /** + * Creates a new S3URI in the form of scheme://(bucket|accessPoint)/key?query#fragment with additional information + * on accessPoints. + *

+ * The URI supports any valid URI schemes to be backwards compatible with s3a and s3n, + * and also allows users to use S3FileIO with other S3-compatible object storage services like GCS. + * + * @param location fully qualified URI + * @param bucketToAccessPointMapping contains mapping of bucket to access point + */ + S3URI(String location, Map bucketToAccessPointMapping) { Preconditions.checkNotNull(location, "Location cannot be null."); this.location = location; @@ -59,7 +77,8 @@ class S3URI { String [] authoritySplit = schemeSplit[1].split(PATH_DELIM, 2); ValidationException.check(authoritySplit.length == 2, "Invalid S3 URI, cannot determine bucket: %s", location); ValidationException.check(!authoritySplit[1].trim().isEmpty(), "Invalid S3 URI, path is empty: %s", location); - this.bucket = authoritySplit[0]; + this.bucket = bucketToAccessPointMapping == null ? authoritySplit[0] : bucketToAccessPointMapping.getOrDefault( + authoritySplit[0], authoritySplit[0]); // Strip query and fragment if they exist String path = authoritySplit[1]; diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3URI.java b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3URI.java index fdd13264ba35..16b71e238c0d 100644 --- a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3URI.java +++ b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3URI.java @@ -19,8 +19,10 @@ package org.apache.iceberg.aws.s3; +import java.util.Map; import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.junit.Test; @@ -90,4 +92,17 @@ public void testValidSchemes() { assertEquals("path/to/file", uri.key()); } } + + @Test + public void testS3URIWithBucketToAccessPointMapping() { + String p1 = "s3://bucket/path/to/file?query=foo#bar"; + Map bucketToAccessPointMapping = ImmutableMap.of( + "bucket", "access-point" + ); + S3URI uri1 = new S3URI(p1, bucketToAccessPointMapping); + + assertEquals("access-point", uri1.bucket()); + assertEquals("path/to/file", uri1.key()); + assertEquals(p1, uri1.toString()); + } } diff --git a/build.gradle b/build.gradle index abd11efa1d06..447f7f1e3f63 100644 --- a/build.gradle +++ b/build.gradle @@ -328,6 +328,7 @@ project(':iceberg-aws') { } testImplementation 'software.amazon.awssdk:iam' + testImplementation 'software.amazon.awssdk:s3control' testImplementation project(path: ':iceberg-api', configuration: 'testArtifacts') testImplementation("com.adobe.testing:s3mock-junit4") { exclude module: "spring-boot-starter-logging"