diff --git a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIOIntegration.java b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIOIntegration.java index 9d5d41438a62..55e43aa303cc 100644 --- a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIOIntegration.java +++ b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIOIntegration.java @@ -68,6 +68,7 @@ import software.amazon.awssdk.services.s3control.S3ControlClient; import software.amazon.awssdk.utils.ImmutableMap; import software.amazon.awssdk.utils.IoUtils; +import software.amazon.s3.analyticsaccelerator.util.PrefetchMode; public class TestS3FileIOIntegration { @@ -255,6 +256,48 @@ public void testNewInputStreamWithMultiRegionAccessPoint() throws Exception { validateRead(s3FileIO); } + @Test + public void testNewInputStreamWithAnalyticsAccelerator() throws Exception { + s3.putObject( + PutObjectRequest.builder().bucket(bucketName).key(objectKey).build(), + RequestBody.fromBytes(contentBytes)); + S3FileIO s3FileIO = new S3FileIO(); + s3FileIO.initialize( + ImmutableMap.of(S3FileIOProperties.S3_ANALYTICS_ACCELERATOR_ENABLED, String.valueOf(true))); + validateRead(s3FileIO); + } + + @Test + public void testNewInputStreamWithAnalyticsAcceleratorAndCRT() throws Exception { + s3.putObject( + PutObjectRequest.builder().bucket(bucketName).key(objectKey).build(), + RequestBody.fromBytes(contentBytes)); + S3FileIO s3FileIO = new S3FileIO(); + s3FileIO.initialize( + ImmutableMap.of( + S3FileIOProperties.S3_ANALYTICS_ACCELERATOR_ENABLED, + String.valueOf(true), + S3FileIOProperties.S3_CRT_ENABLED, + String.valueOf(true))); + validateRead(s3FileIO); + } + + @Test + public void testNewInputStreamWithAnalyticsAcceleratorCustomConfigured() throws Exception { + final String prefetchingMode = "logicalio.prefetching.mode"; + final String s3Uri = String.format("s3://%s/%s/%s.parquet", bucketName, prefix, objectKey); + S3FileIO s3FileIO = new S3FileIO(); + s3FileIO.initialize( + ImmutableMap.of( + S3FileIOProperties.S3_ANALYTICS_ACCELERATOR_ENABLED, + String.valueOf(true), + S3FileIOProperties.S3_ANALYTICS_ACCELERATOR_PROPERTIES_PREFIX + prefetchingMode, + PrefetchMode.ALL.name())); + write(s3FileIO, s3Uri); + validateRead(s3FileIO, s3Uri); + s3FileIO.deleteFile(s3Uri); + } + @Test public void testNewOutputStream() throws Exception { S3FileIO s3FileIO = new S3FileIO(clientFactory::s3); @@ -324,6 +367,19 @@ public void testNewOutputStreamWithMultiRegionAccessPoint() throws Exception { } } + @Test + public void testNewOutputStreamWithAnalyticsAccelerator() throws Exception { + S3FileIO s3FileIO = new S3FileIO(); + s3FileIO.initialize( + ImmutableMap.of(S3FileIOProperties.S3_ANALYTICS_ACCELERATOR_ENABLED, String.valueOf(true))); + write(s3FileIO); + try (InputStream stream = + s3.getObject(GetObjectRequest.builder().bucket(bucketName).key(objectKey).build())) { + String result = IoUtils.toUtf8String(stream); + assertThat(result).isEqualTo(content); + } + } + @Test public void testServerSideS3Encryption() throws Exception { S3FileIOProperties properties = new S3FileIOProperties(); diff --git a/aws/src/main/java/org/apache/iceberg/aws/AssumeRoleAwsClientFactory.java b/aws/src/main/java/org/apache/iceberg/aws/AssumeRoleAwsClientFactory.java index 4149d795d30d..50269859c383 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/AssumeRoleAwsClientFactory.java +++ b/aws/src/main/java/org/apache/iceberg/aws/AssumeRoleAwsClientFactory.java @@ -28,7 +28,10 @@ import software.amazon.awssdk.services.dynamodb.DynamoDbClient; import software.amazon.awssdk.services.glue.GlueClient; import software.amazon.awssdk.services.kms.KmsClient; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.S3AsyncClientBuilder; import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.S3CrtAsyncClientBuilder; import software.amazon.awssdk.services.sts.StsClient; import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider; import software.amazon.awssdk.services.sts.model.AssumeRoleRequest; @@ -52,6 +55,14 @@ public S3Client s3() { .build(); } + @Override + public S3AsyncClient s3Async() { + if (s3FileIOProperties.isS3CRTEnabled()) { + return S3AsyncClient.crtBuilder().applyMutation(this::applyAssumeRoleConfigurations).build(); + } + return S3AsyncClient.builder().applyMutation(this::applyAssumeRoleConfigurations).build(); + } + @Override public GlueClient glue() { return GlueClient.builder() @@ -95,24 +106,25 @@ public void initialize(Map properties) { protected T applyAssumeRoleConfigurations( T clientBuilder) { - AssumeRoleRequest assumeRoleRequest = - AssumeRoleRequest.builder() - .roleArn(awsProperties.clientAssumeRoleArn()) - .roleSessionName(roleSessionName) - .durationSeconds(awsProperties.clientAssumeRoleTimeoutSec()) - .externalId(awsProperties.clientAssumeRoleExternalId()) - .tags(awsProperties.stsClientAssumeRoleTags()) - .build(); clientBuilder - .credentialsProvider( - StsAssumeRoleCredentialsProvider.builder() - .stsClient(sts()) - .refreshRequest(assumeRoleRequest) - .build()) + .credentialsProvider(createCredentialsProvider()) .region(Region.of(awsProperties.clientAssumeRoleRegion())); return clientBuilder; } + protected S3AsyncClientBuilder applyAssumeRoleConfigurations(S3AsyncClientBuilder clientBuilder) { + return clientBuilder + .credentialsProvider(createCredentialsProvider()) + .region(Region.of(awsProperties.clientAssumeRoleRegion())); + } + + protected S3CrtAsyncClientBuilder applyAssumeRoleConfigurations( + S3CrtAsyncClientBuilder clientBuilder) { + return clientBuilder + .credentialsProvider(createCredentialsProvider()) + .region(Region.of(awsProperties.clientAssumeRoleRegion())); + } + protected String region() { return awsProperties.clientAssumeRoleRegion(); } @@ -145,4 +157,21 @@ private String genSessionName() { } return String.format("iceberg-aws-%s", UUID.randomUUID()); } + + private StsAssumeRoleCredentialsProvider createCredentialsProvider() { + return StsAssumeRoleCredentialsProvider.builder() + .stsClient(sts()) + .refreshRequest(createAssumeRoleRequest()) + .build(); + } + + private AssumeRoleRequest createAssumeRoleRequest() { + return AssumeRoleRequest.builder() + .roleArn(awsProperties.clientAssumeRoleArn()) + .roleSessionName(roleSessionName) + .durationSeconds(awsProperties.clientAssumeRoleTimeoutSec()) + .externalId(awsProperties.clientAssumeRoleExternalId()) + .tags(awsProperties.stsClientAssumeRoleTags()) + .build(); + } } diff --git a/aws/src/main/java/org/apache/iceberg/aws/AwsClientFactories.java b/aws/src/main/java/org/apache/iceberg/aws/AwsClientFactories.java index 7554b5629be4..5d581b01bf30 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/AwsClientFactories.java +++ b/aws/src/main/java/org/apache/iceberg/aws/AwsClientFactories.java @@ -39,6 +39,7 @@ import software.amazon.awssdk.services.glue.GlueClient; import software.amazon.awssdk.services.glue.GlueClientBuilder; import software.amazon.awssdk.services.kms.KmsClient; +import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.S3ClientBuilder; import software.amazon.awssdk.services.s3.S3Configuration; @@ -118,6 +119,14 @@ public S3Client s3() { .build(); } + @Override + public S3AsyncClient s3Async() { + if (s3FileIOProperties.isS3CRTEnabled()) { + return S3AsyncClient.crtBuilder().build(); + } + return S3AsyncClient.builder().build(); + } + @Override public GlueClient glue() { return GlueClient.builder() diff --git a/aws/src/main/java/org/apache/iceberg/aws/AwsClientFactory.java b/aws/src/main/java/org/apache/iceberg/aws/AwsClientFactory.java index daef0d048090..549dc038e25f 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/AwsClientFactory.java +++ b/aws/src/main/java/org/apache/iceberg/aws/AwsClientFactory.java @@ -23,6 +23,7 @@ import software.amazon.awssdk.services.dynamodb.DynamoDbClient; import software.amazon.awssdk.services.glue.GlueClient; import software.amazon.awssdk.services.kms.KmsClient; +import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3Client; /** @@ -38,6 +39,13 @@ public interface AwsClientFactory extends Serializable { */ S3Client s3(); + /** + * create a Amazon S3 async client + * + * @return s3 async client + */ + S3AsyncClient s3Async(); + /** * create a AWS Glue client * diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/AnalyticsAcceleratorInputStreamWrapper.java b/aws/src/main/java/org/apache/iceberg/aws/s3/AnalyticsAcceleratorInputStreamWrapper.java new file mode 100644 index 000000000000..197f79905f0f --- /dev/null +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/AnalyticsAcceleratorInputStreamWrapper.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.aws.s3; + +import java.io.IOException; +import org.apache.iceberg.io.SeekableInputStream; +import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream; + +/** A wrapper to convert {@link S3SeekableInputStream} to Iceberg {@link SeekableInputStream} */ +class AnalyticsAcceleratorInputStreamWrapper extends SeekableInputStream { + + private final S3SeekableInputStream delegate; + + AnalyticsAcceleratorInputStreamWrapper(S3SeekableInputStream stream) { + this.delegate = stream; + } + + @Override + public int read() throws IOException { + return this.delegate.read(); + } + + @Override + public int read(byte[] b) throws IOException { + return this.delegate.read(b, 0, b.length); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + return this.delegate.read(b, off, len); + } + + @Override + public void seek(long l) throws IOException { + this.delegate.seek(l); + } + + @Override + public long getPos() { + return this.delegate.getPos(); + } + + @Override + public void close() throws IOException { + this.delegate.close(); + } +} diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/AnalyticsAcceleratorUtil.java b/aws/src/main/java/org/apache/iceberg/aws/s3/AnalyticsAcceleratorUtil.java new file mode 100644 index 000000000000..192ed015204a --- /dev/null +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/AnalyticsAcceleratorUtil.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.aws.s3; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import java.io.IOException; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.io.SeekableInputStream; +import org.apache.iceberg.util.Pair; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.model.HeadObjectResponse; +import software.amazon.s3.analyticsaccelerator.ObjectClientConfiguration; +import software.amazon.s3.analyticsaccelerator.S3SdkObjectClient; +import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream; +import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration; +import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory; +import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration; +import software.amazon.s3.analyticsaccelerator.request.ObjectClient; +import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata; +import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation; +import software.amazon.s3.analyticsaccelerator.util.S3URI; + +class AnalyticsAcceleratorUtil { + + private static final Cache, S3SeekableInputStreamFactory> + STREAM_FACTORY_CACHE = Caffeine.newBuilder().maximumSize(100).build(); + + private AnalyticsAcceleratorUtil() {} + + public static SeekableInputStream newStream(S3InputFile inputFile) { + S3URI uri = S3URI.of(inputFile.uri().bucket(), inputFile.uri().key()); + HeadObjectResponse metadata = inputFile.getObjectMetadata(); + OpenStreamInformation openStreamInfo = + OpenStreamInformation.builder() + .objectMetadata( + ObjectMetadata.builder() + .contentLength(metadata.contentLength()) + .etag(metadata.eTag()) + .build()) + .build(); + + S3SeekableInputStreamFactory factory = + STREAM_FACTORY_CACHE.get( + Pair.of(inputFile.asyncClient(), inputFile.s3FileIOProperties()), + AnalyticsAcceleratorUtil::createNewFactory); + + try { + S3SeekableInputStream seekableInputStream = factory.createStream(uri, openStreamInfo); + return new AnalyticsAcceleratorInputStreamWrapper(seekableInputStream); + } catch (IOException e) { + throw new RuntimeIOException( + e, "Failed to create S3 analytics accelerator input stream for: %s", inputFile.uri()); + } + } + + private static S3SeekableInputStreamFactory createNewFactory( + Pair cacheKey) { + ConnectorConfiguration connectorConfiguration = + new ConnectorConfiguration(cacheKey.second().s3AnalyticsacceleratorProperties()); + S3SeekableInputStreamConfiguration streamConfiguration = + S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration); + ObjectClientConfiguration objectClientConfiguration = + ObjectClientConfiguration.fromConfiguration(connectorConfiguration); + + ObjectClient objectClient = new S3SdkObjectClient(cacheKey.first(), objectClientConfiguration); + return new S3SeekableInputStreamFactory(objectClient, streamConfiguration); + } + + public static void cleanupCache( + S3AsyncClient asyncClient, S3FileIOProperties s3FileIOProperties) { + STREAM_FACTORY_CACHE.invalidate(Pair.of(asyncClient, s3FileIOProperties)); + } +} diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/BaseS3File.java b/aws/src/main/java/org/apache/iceberg/aws/s3/BaseS3File.java index 88b212a37bec..2ea7651fc847 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/BaseS3File.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/BaseS3File.java @@ -20,6 +20,7 @@ import org.apache.iceberg.metrics.MetricsContext; import software.amazon.awssdk.http.HttpStatusCode; +import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.HeadObjectRequest; import software.amazon.awssdk.services.s3.model.HeadObjectResponse; @@ -27,14 +28,20 @@ abstract class BaseS3File { private final S3Client client; + private final S3AsyncClient asyncClient; private final S3URI uri; private final S3FileIOProperties s3FileIOProperties; private HeadObjectResponse metadata; private final MetricsContext metrics; BaseS3File( - S3Client client, S3URI uri, S3FileIOProperties s3FileIOProperties, MetricsContext metrics) { + S3Client client, + S3AsyncClient asyncClient, + S3URI uri, + S3FileIOProperties s3FileIOProperties, + MetricsContext metrics) { this.client = client; + this.asyncClient = asyncClient; this.uri = uri; this.s3FileIOProperties = s3FileIOProperties; this.metrics = metrics; @@ -48,6 +55,10 @@ S3Client client() { return client; } + S3AsyncClient asyncClient() { + return asyncClient; + } + S3URI uri() { return uri; } diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/DefaultS3FileIOAwsClientFactory.java b/aws/src/main/java/org/apache/iceberg/aws/s3/DefaultS3FileIOAwsClientFactory.java index 8687d737a5d7..b72148a45343 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/DefaultS3FileIOAwsClientFactory.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/DefaultS3FileIOAwsClientFactory.java @@ -21,6 +21,7 @@ import java.util.Map; import org.apache.iceberg.aws.AwsClientProperties; import org.apache.iceberg.aws.HttpClientProperties; +import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3Client; class DefaultS3FileIOAwsClientFactory implements S3FileIOAwsClientFactory { @@ -58,4 +59,12 @@ public S3Client s3() { .applyMutation(s3FileIOProperties::applyRetryConfigurations) .build(); } + + @Override + public S3AsyncClient s3Async() { + if (s3FileIOProperties.isS3CRTEnabled()) { + return S3AsyncClient.crtBuilder().build(); + } + return S3AsyncClient.builder().build(); + } } diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java index 23b246c357c9..172686d3d343 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java @@ -56,6 +56,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.core.exception.SdkException; +import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.Delete; import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; @@ -87,9 +88,11 @@ public class S3FileIO implements CredentialSupplier, DelegateFileIO, SupportsRec private String credential = null; private SerializableSupplier s3; + private SerializableSupplier s3Async; private S3FileIOProperties s3FileIOProperties; private SerializableMap properties = null; private transient volatile S3Client client; + private transient volatile S3AsyncClient asyncClient; private MetricsContext metrics = MetricsContext.nullMetrics(); private final AtomicBoolean isResourceClosed = new AtomicBoolean(false); private transient StackTraceElement[] createStack; @@ -109,7 +112,19 @@ public S3FileIO() {} * @param s3 s3 supplier */ public S3FileIO(SerializableSupplier s3) { - this(s3, new S3FileIOProperties()); + this(s3, null, new S3FileIOProperties()); + } + + /** + * Constructor with custom s3 supplier and s3Async supplier. + * + *

Calling {@link S3FileIO#initialize(Map)} will overwrite information set in this constructor. + * + * @param s3 s3 supplier + * @param s3Async s3Async supplier + */ + public S3FileIO(SerializableSupplier s3, SerializableSupplier s3Async) { + this(s3, s3Async, new S3FileIOProperties()); } /** @@ -121,23 +136,50 @@ public S3FileIO(SerializableSupplier s3) { * @param s3FileIOProperties S3 FileIO properties */ public S3FileIO(SerializableSupplier s3, S3FileIOProperties s3FileIOProperties) { + this(s3, null, s3FileIOProperties); + } + + /** + * Constructor with custom s3 supplier, s3Async supplier and S3FileIO properties. + * + *

Calling {@link S3FileIO#initialize(Map)} will overwrite information set in this constructor. + * + * @param s3 s3 supplier + * @param s3Async s3Async supplier + * @param s3FileIOProperties S3 FileIO properties + */ + public S3FileIO( + SerializableSupplier s3, + SerializableSupplier s3Async, + S3FileIOProperties s3FileIOProperties) { this.s3 = s3; + this.s3Async = s3Async; this.s3FileIOProperties = s3FileIOProperties; this.createStack = Thread.currentThread().getStackTrace(); } @Override public InputFile newInputFile(String path) { + if (shouldUseAsyncClient()) { + return S3InputFile.fromLocation(path, client(), asyncClient(), s3FileIOProperties, metrics); + } return S3InputFile.fromLocation(path, client(), s3FileIOProperties, metrics); } @Override public InputFile newInputFile(String path, long length) { + if (shouldUseAsyncClient()) { + return S3InputFile.fromLocation( + path, length, client(), asyncClient(), s3FileIOProperties, metrics); + } return S3InputFile.fromLocation(path, length, client(), s3FileIOProperties, metrics); } @Override public OutputFile newOutputFile(String path) { + if (shouldUseAsyncClient()) { + return S3OutputFile.fromLocation(path, client(), asyncClient(), s3FileIOProperties, metrics); + } return S3OutputFile.fromLocation(path, client(), s3FileIOProperties, metrics); } @@ -343,6 +385,21 @@ public S3Client client() { return client; } + public S3AsyncClient asyncClient() { + if (asyncClient == null) { + synchronized (this) { + if (asyncClient == null) { + asyncClient = s3Async.get(); + } + } + } + return asyncClient; + } + + private boolean shouldUseAsyncClient() { + return s3FileIOProperties.isS3AnalyticsAcceleratorEnabled(); + } + private ExecutorService executorService() { if (executorService == null) { synchronized (S3FileIO.class) { @@ -388,6 +445,17 @@ public void initialize(Map props) { } } + // Do not override s3Async client if it was provided + if (s3Async == null) { + Object clientFactory = S3FileIOAwsClientFactories.initialize(props); + if (clientFactory instanceof S3FileIOAwsClientFactory) { + this.s3Async = ((S3FileIOAwsClientFactory) clientFactory)::s3Async; + } + if (clientFactory instanceof AwsClientFactory) { + this.s3Async = ((AwsClientFactory) clientFactory)::s3Async; + } + } + initMetrics(properties); } @@ -415,6 +483,11 @@ public void close() { if (client != null) { client.close(); } + if (asyncClient != null) { + // cleanup usage in analytics accelerator if any + AnalyticsAcceleratorUtil.cleanupCache(asyncClient, s3FileIOProperties); + asyncClient.close(); + } } } diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOAwsClientFactory.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOAwsClientFactory.java index 718298818a30..4b2d5ccb1504 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOAwsClientFactory.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOAwsClientFactory.java @@ -20,6 +20,7 @@ import java.io.Serializable; import java.util.Map; +import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3Client; public interface S3FileIOAwsClientFactory extends Serializable { @@ -30,6 +31,13 @@ public interface S3FileIOAwsClientFactory extends Serializable { */ S3Client s3(); + /** + * create a Amazon S3 async client + * + * @return s3 async client + */ + S3AsyncClient s3Async(); + /** * Initialize AWS client factory from catalog properties. * diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java index 8d97b9d1bf20..adaa6376f569 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java @@ -72,6 +72,32 @@ public class S3FileIOProperties implements Serializable { public static final boolean S3_ACCESS_GRANTS_ENABLED_DEFAULT = false; + /** + * This property is used to enable using the S3 Analytics Accelerator library to accelerate data + * access from client applications to Amazon S3. + * + *

For more details, see: https://github.com/awslabs/analytics-accelerator-s3 + */ + public static final String S3_ANALYTICS_ACCELERATOR_ENABLED = "s3.analytics-accelerator.enabled"; + + public static final boolean S3_ANALYTICS_ACCELERATOR_ENABLED_DEFAULT = false; + + /** + * This prefix allows users to configure the internal properties of the s3 analytics accelerator. + * + *

Example: s3.analytics-accelerator.logicalio.prefetching.mode=all + * + *

For more details, see: + * https://github.com/awslabs/analytics-accelerator-s3/blob/main/doc/CONFIGURATION.md + */ + public static final String S3_ANALYTICS_ACCELERATOR_PROPERTIES_PREFIX = + "s3.analytics-accelerator."; + + /** This property is used to specify if the S3 Async clients should be created using CRT. */ + public static final String S3_CRT_ENABLED = "s3.crt.enabled"; + + public static final boolean S3_CRT_ENABLED_DEFAULT = true; + /** * The fallback-to-iam property allows users to customize whether or not they would like their * jobs fall back to the Job Execution IAM role in case they get an Access Denied from the S3 @@ -484,6 +510,9 @@ public class S3FileIOProperties implements Serializable { private final boolean isAccelerationEnabled; private final String endpoint; private final boolean isRemoteSigningEnabled; + private final boolean isS3AnalyticsAcceleratorEnabled; + private final Map s3AnalyticsacceleratorProperties; + private final boolean isS3CRTEnabled; private String writeStorageClass; private int s3RetryNumRetries; private long s3RetryMinWaitMs; @@ -528,6 +557,9 @@ public S3FileIOProperties() { this.s3RetryMaxWaitMs = S3_RETRY_MAX_WAIT_MS_DEFAULT; this.s3DirectoryBucketListPrefixAsDirectory = S3_DIRECTORY_BUCKET_LIST_PREFIX_AS_DIRECTORY_DEFAULT; + this.isS3AnalyticsAcceleratorEnabled = S3_ANALYTICS_ACCELERATOR_ENABLED_DEFAULT; + this.s3AnalyticsacceleratorProperties = Maps.newHashMap(); + this.isS3CRTEnabled = S3_CRT_ENABLED_DEFAULT; this.allProperties = Maps.newHashMap(); ValidationException.check( @@ -640,6 +672,13 @@ public S3FileIOProperties(Map properties) { properties, S3_DIRECTORY_BUCKET_LIST_PREFIX_AS_DIRECTORY, S3_DIRECTORY_BUCKET_LIST_PREFIX_AS_DIRECTORY_DEFAULT); + this.isS3AnalyticsAcceleratorEnabled = + PropertyUtil.propertyAsBoolean( + properties, S3_ANALYTICS_ACCELERATOR_ENABLED, S3_ANALYTICS_ACCELERATOR_ENABLED_DEFAULT); + this.s3AnalyticsacceleratorProperties = + PropertyUtil.propertiesWithPrefix(properties, S3_ANALYTICS_ACCELERATOR_PROPERTIES_PREFIX); + this.isS3CRTEnabled = + PropertyUtil.propertyAsBoolean(properties, S3_CRT_ENABLED, S3_CRT_ENABLED_DEFAULT); ValidationException.check( keyIdAccessKeyBothConfigured(), @@ -754,6 +793,18 @@ public boolean isRemoteSigningEnabled() { return this.isRemoteSigningEnabled; } + public boolean isS3AnalyticsAcceleratorEnabled() { + return isS3AnalyticsAcceleratorEnabled; + } + + public Map s3AnalyticsacceleratorProperties() { + return s3AnalyticsacceleratorProperties; + } + + public boolean isS3CRTEnabled() { + return isS3CRTEnabled; + } + public String endpoint() { return this.endpoint; } diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputFile.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputFile.java index 5ad82c153fbc..e86bf4db397b 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputFile.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputFile.java @@ -23,6 +23,7 @@ import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.SeekableInputStream; import org.apache.iceberg.metrics.MetricsContext; +import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3Client; public class S3InputFile extends BaseS3File implements InputFile, NativelyEncryptedFile { @@ -36,20 +37,53 @@ public static S3InputFile fromLocation( MetricsContext metrics) { return new S3InputFile( client, + null, new S3URI(location, s3FileIOProperties.bucketToAccessPointMapping()), null, s3FileIOProperties, metrics); } + public static S3InputFile fromLocation( + String location, + S3Client client, + S3AsyncClient asyncClient, + S3FileIOProperties s3FileIOProperties, + MetricsContext metrics) { + return new S3InputFile( + client, + asyncClient, + new S3URI(location, s3FileIOProperties.bucketToAccessPointMapping()), + null, + s3FileIOProperties, + metrics); + } + + public static S3InputFile fromLocation( + String location, + long length, + S3Client client, + S3FileIOProperties s3FileIOProperties, + MetricsContext metrics) { + return new S3InputFile( + client, + null, + new S3URI(location, s3FileIOProperties.bucketToAccessPointMapping()), + length > 0 ? length : null, + s3FileIOProperties, + metrics); + } + public static S3InputFile fromLocation( String location, long length, S3Client client, + S3AsyncClient asyncClient, S3FileIOProperties s3FileIOProperties, MetricsContext metrics) { return new S3InputFile( client, + asyncClient, new S3URI(location, s3FileIOProperties.bucketToAccessPointMapping()), length > 0 ? length : null, s3FileIOProperties, @@ -58,11 +92,12 @@ public static S3InputFile fromLocation( S3InputFile( S3Client client, + S3AsyncClient asyncClient, S3URI uri, Long length, S3FileIOProperties s3FileIOProperties, MetricsContext metrics) { - super(client, uri, s3FileIOProperties, metrics); + super(client, asyncClient, uri, s3FileIOProperties, metrics); this.length = length; } @@ -82,6 +117,9 @@ public long getLength() { @Override public SeekableInputStream newStream() { + if (s3FileIOProperties().isS3AnalyticsAcceleratorEnabled()) { + return AnalyticsAcceleratorUtil.newStream(this); + } return new S3InputStream(client(), uri(), s3FileIOProperties(), metrics()); } diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputFile.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputFile.java index abe7a55fba10..af69c1b8681c 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputFile.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputFile.java @@ -27,6 +27,7 @@ import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.io.PositionOutputStream; import org.apache.iceberg.metrics.MetricsContext; +import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3Client; public class S3OutputFile extends BaseS3File implements OutputFile, NativelyEncryptedFile { @@ -39,14 +40,33 @@ public static S3OutputFile fromLocation( MetricsContext metrics) { return new S3OutputFile( client, + null, + new S3URI(location, s3FileIOProperties.bucketToAccessPointMapping()), + s3FileIOProperties, + metrics); + } + + public static S3OutputFile fromLocation( + String location, + S3Client client, + S3AsyncClient asyncClient, + S3FileIOProperties s3FileIOProperties, + MetricsContext metrics) { + return new S3OutputFile( + client, + asyncClient, new S3URI(location, s3FileIOProperties.bucketToAccessPointMapping()), s3FileIOProperties, metrics); } S3OutputFile( - S3Client client, S3URI uri, S3FileIOProperties s3FileIOProperties, MetricsContext metrics) { - super(client, uri, s3FileIOProperties, metrics); + S3Client client, + S3AsyncClient asyncClient, + S3URI uri, + S3FileIOProperties s3FileIOProperties, + MetricsContext metrics) { + super(client, asyncClient, uri, s3FileIOProperties, metrics); } /** @@ -75,7 +95,7 @@ public PositionOutputStream createOrOverwrite() { @Override public InputFile toInputFile() { - return new S3InputFile(client(), uri(), null, s3FileIOProperties(), metrics()); + return new S3InputFile(client(), asyncClient(), uri(), null, s3FileIOProperties(), metrics()); } @Override diff --git a/aws/src/test/java/org/apache/iceberg/aws/TestAwsClientFactories.java b/aws/src/test/java/org/apache/iceberg/aws/TestAwsClientFactories.java index a22eb1549d34..cc5854617463 100644 --- a/aws/src/test/java/org/apache/iceberg/aws/TestAwsClientFactories.java +++ b/aws/src/test/java/org/apache/iceberg/aws/TestAwsClientFactories.java @@ -40,6 +40,7 @@ import software.amazon.awssdk.services.glue.GlueClient; import software.amazon.awssdk.services.glue.model.GetTablesRequest; import software.amazon.awssdk.services.kms.KmsClient; +import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3Client; public class TestAwsClientFactories { @@ -292,6 +293,11 @@ public S3Client s3() { return null; } + @Override + public S3AsyncClient s3Async() { + return null; + } + @Override public GlueClient glue() { return null; diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/StaticClientFactory.java b/aws/src/test/java/org/apache/iceberg/aws/s3/StaticClientFactory.java index 6a2edcf01ed2..76f47adb17e8 100644 --- a/aws/src/test/java/org/apache/iceberg/aws/s3/StaticClientFactory.java +++ b/aws/src/test/java/org/apache/iceberg/aws/s3/StaticClientFactory.java @@ -23,6 +23,7 @@ import software.amazon.awssdk.services.dynamodb.DynamoDbClient; import software.amazon.awssdk.services.glue.GlueClient; import software.amazon.awssdk.services.kms.KmsClient; +import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3Client; class StaticClientFactory implements AwsClientFactory { @@ -33,6 +34,11 @@ public S3Client s3() { return client; } + @Override + public S3AsyncClient s3Async() { + return null; + } + @Override public GlueClient glue() { return null; diff --git a/build.gradle b/build.gradle index f747c68cbfcd..d08fc1bc7d11 100644 --- a/build.gradle +++ b/build.gradle @@ -470,6 +470,8 @@ project(':iceberg-aws') { compileOnly("software.amazon.awssdk:dynamodb") compileOnly("software.amazon.awssdk:lakeformation") + compileOnly(libs.analyticsaccelerator.s3) + compileOnly(libs.hadoop3.common) { exclude group: 'org.apache.avro', module: 'avro' exclude group: 'org.slf4j', module: 'slf4j-log4j12' diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index d532c3efaca3..0bf2dfe1c4df 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -22,6 +22,7 @@ [versions] activation = "1.1.1" aliyun-sdk-oss = "3.10.2" +analyticsaccelerator = "1.0.0" antlr = "4.9.3" aircompressor = "0.27" apiguardian = "1.1.2" @@ -88,6 +89,7 @@ tez08 = { strictly = "0.8.4"} # see rich version usage explanation above activation = { module = "javax.activation:activation", version.ref = "activation" } aircompressor = { module = "io.airlift:aircompressor", version.ref = "aircompressor" } aliyun-sdk-oss = { module = "com.aliyun.oss:aliyun-sdk-oss", version.ref = "aliyun-sdk-oss" } +analyticsaccelerator-s3 = { module = "software.amazon.s3.analyticsaccelerator:analyticsaccelerator-s3", version.ref = "analyticsaccelerator" } antlr-antlr4 = { module = "org.antlr:antlr4", version.ref = "antlr" } antlr-runtime = { module = "org.antlr:antlr4-runtime", version.ref = "antlr" } arrow-memory-netty = { module = "org.apache.arrow:arrow-memory-netty", version.ref = "arrow" } diff --git a/kafka-connect/build.gradle b/kafka-connect/build.gradle index 634c70326627..75d8fbe9b784 100644 --- a/kafka-connect/build.gradle +++ b/kafka-connect/build.gradle @@ -175,6 +175,7 @@ project(':iceberg-kafka-connect:iceberg-kafka-connect-runtime') { integrationImplementation libs.testcontainers integrationImplementation libs.httpcomponents.httpclient5 integrationImplementation libs.awaitility + integrationImplementation libs.analyticsaccelerator.s3 } task integrationTest(type: Test) {