diff --git a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIO.java b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIO.java index ebee07e53e4c..f1c3206a93ab 100644 --- a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIO.java +++ b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIO.java @@ -98,7 +98,6 @@ import software.amazon.awssdk.identity.spi.AwsSessionCredentialsIdentity; import software.amazon.awssdk.identity.spi.IdentityProvider; import software.amazon.awssdk.regions.Region; -import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.S3ServiceClientConfiguration; import software.amazon.awssdk.services.s3.model.BucketAlreadyExistsException; @@ -507,17 +506,13 @@ public void fileIOWithPrefixedS3ClientWithoutCredentialsSerialization( io.initialize(Map.of(AwsClientProperties.CLIENT_REGION, "us-east-1")); assertThat(io.client()).isInstanceOf(S3Client.class); - assertThat(io.asyncClient()).isInstanceOf(S3AsyncClient.class); assertThat(io.client("s3a://my-bucket/my-path")).isInstanceOf(S3Client.class); - assertThat(io.asyncClient("s3a://my-bucket/my-path")).isInstanceOf(S3AsyncClient.class); S3FileIO fileIO = roundTripSerializer.apply(io); assertThat(fileIO.credentials()).isEqualTo(io.credentials()).isEmpty(); assertThat(fileIO.client()).isInstanceOf(S3Client.class); - assertThat(fileIO.asyncClient()).isInstanceOf(S3AsyncClient.class); assertThat(fileIO.client("s3a://my-bucket/my-path")).isInstanceOf(S3Client.class); - assertThat(fileIO.asyncClient("s3a://my-bucket/my-path")).isInstanceOf(S3AsyncClient.class); } @ParameterizedTest @@ -533,18 +528,14 @@ public void fileIOWithPrefixedS3ClientSerialization( // there should be a client for the generic and specific storage prefix available assertThat(io.client()).isInstanceOf(S3Client.class); - assertThat(io.asyncClient()).isInstanceOf(S3AsyncClient.class); assertThat(io.client("s3://my-bucket/my-path")).isInstanceOf(S3Client.class); - assertThat(io.asyncClient("s3://my-bucket/my-path")).isInstanceOf(S3AsyncClient.class); S3FileIO fileIO = roundTripSerializer.apply(io); assertThat(fileIO.credentials()).isEqualTo(io.credentials()); // make sure there's a client for the generic and specific storage prefix available after ser/de assertThat(fileIO.client()).isInstanceOf(S3Client.class); - assertThat(fileIO.asyncClient()).isInstanceOf(S3AsyncClient.class); assertThat(fileIO.client("s3://my-bucket/my-path")).isInstanceOf(S3Client.class); - assertThat(fileIO.asyncClient("s3://my-bucket/my-path")).isInstanceOf(S3AsyncClient.class); } @Test @@ -641,9 +632,6 @@ public void resolvingFileIOLoadWithoutStorageCredentials( assertThat(fileIO.client("s3://foo/bar")) .isSameAs(fileIO.client()) .isInstanceOf(S3Client.class); - assertThat(fileIO.asyncClient("s3://foo/bar")) - .isSameAs(fileIO.asyncClient()) - .isInstanceOf(S3AsyncClient.class); }); // make sure credentials can be accessed after serde @@ -662,9 +650,6 @@ public void resolvingFileIOLoadWithoutStorageCredentials( assertThat(fileIO.client("s3://foo/bar")) .isSameAs(fileIO.client()) .isInstanceOf(S3Client.class); - assertThat(fileIO.asyncClient("s3://foo/bar")) - .isSameAs(fileIO.asyncClient()) - .isInstanceOf(S3AsyncClient.class); }); } @@ -696,9 +681,6 @@ public void resolvingFileIOLoadWithStorageCredentials( assertThat(fileIO.client("s3://foo/bar")) .isNotSameAs(fileIO.client()) .isInstanceOf(S3Client.class); - assertThat(fileIO.asyncClient("s3://foo/bar")) - .isNotSameAs(fileIO.asyncClient()) - .isInstanceOf(S3AsyncClient.class); }); // make sure credentials are still present after serde @@ -721,9 +703,6 @@ public void resolvingFileIOLoadWithStorageCredentials( assertThat(fileIO.client("s3://foo/bar")) .isNotSameAs(fileIO.client()) .isInstanceOf(S3Client.class); - assertThat(fileIO.asyncClient("s3://foo/bar")) - .isNotSameAs(fileIO.asyncClient()) - .isInstanceOf(S3AsyncClient.class); }); } diff --git a/aws/src/main/java/org/apache/iceberg/aws/AssumeRoleAwsClientFactory.java b/aws/src/main/java/org/apache/iceberg/aws/AssumeRoleAwsClientFactory.java index 59a4d8d3ac38..e4d08df77bd9 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/AssumeRoleAwsClientFactory.java +++ b/aws/src/main/java/org/apache/iceberg/aws/AssumeRoleAwsClientFactory.java @@ -28,8 +28,6 @@ import software.amazon.awssdk.services.dynamodb.DynamoDbClient; import software.amazon.awssdk.services.glue.GlueClient; import software.amazon.awssdk.services.kms.KmsClient; -import software.amazon.awssdk.services.s3.S3AsyncClient; -import software.amazon.awssdk.services.s3.S3AsyncClientBuilder; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.S3CrtAsyncClientBuilder; import software.amazon.awssdk.services.sts.StsClient; @@ -56,26 +54,6 @@ public S3Client s3() { .build(); } - @Override - public S3AsyncClient s3Async() { - if (s3FileIOProperties.isS3CRTEnabled()) { - return S3AsyncClient.crtBuilder() - .applyMutation(this::applyAssumeRoleConfigurations) - .applyMutation(awsClientProperties::applyClientRegionConfiguration) - .applyMutation(awsClientProperties::applyClientCredentialConfigurations) - .applyMutation(s3FileIOProperties::applyEndpointConfigurations) - .applyMutation(s3FileIOProperties::applyS3CrtConfigurations) - .build(); - } - return S3AsyncClient.builder() - .applyMutation(this::applyAssumeRoleConfigurations) - .applyMutation(awsClientProperties::applyClientRegionConfiguration) - .applyMutation(awsClientProperties::applyClientCredentialConfigurations) - .applyMutation(awsClientProperties::applyLegacyMd5Plugin) - .applyMutation(s3FileIOProperties::applyEndpointConfigurations) - .build(); - } - @Override public GlueClient glue() { return GlueClient.builder() @@ -125,12 +103,6 @@ protected T applyAssumeRoleC return clientBuilder; } - protected S3AsyncClientBuilder applyAssumeRoleConfigurations(S3AsyncClientBuilder clientBuilder) { - return clientBuilder - .credentialsProvider(createCredentialsProvider()) - .region(Region.of(awsProperties.clientAssumeRoleRegion())); - } - protected S3CrtAsyncClientBuilder applyAssumeRoleConfigurations( S3CrtAsyncClientBuilder clientBuilder) { return clientBuilder diff --git a/aws/src/main/java/org/apache/iceberg/aws/AwsClientFactories.java b/aws/src/main/java/org/apache/iceberg/aws/AwsClientFactories.java index cd5715b93b63..0972e4eb4b3e 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/AwsClientFactories.java +++ b/aws/src/main/java/org/apache/iceberg/aws/AwsClientFactories.java @@ -39,7 +39,6 @@ import software.amazon.awssdk.services.glue.GlueClient; import software.amazon.awssdk.services.glue.GlueClientBuilder; import software.amazon.awssdk.services.kms.KmsClient; -import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3BaseClientBuilder; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.S3Configuration; @@ -120,26 +119,6 @@ public S3Client s3() { .build(); } - @Override - public S3AsyncClient s3Async() { - if (s3FileIOProperties.isS3CRTEnabled()) { - return S3AsyncClient.crtBuilder() - .applyMutation(awsClientProperties::applyClientRegionConfiguration) - .applyMutation( - b -> s3FileIOProperties.applyCredentialConfigurations(awsClientProperties, b)) - .applyMutation(s3FileIOProperties::applyEndpointConfigurations) - .applyMutation(s3FileIOProperties::applyS3CrtConfigurations) - .build(); - } - return S3AsyncClient.builder() - .applyMutation(awsClientProperties::applyClientRegionConfiguration) - .applyMutation(awsClientProperties::applyLegacyMd5Plugin) - .applyMutation( - b -> s3FileIOProperties.applyCredentialConfigurations(awsClientProperties, b)) - .applyMutation(s3FileIOProperties::applyEndpointConfigurations) - .build(); - } - @Override public GlueClient glue() { return GlueClient.builder() diff --git a/aws/src/main/java/org/apache/iceberg/aws/AwsClientFactory.java b/aws/src/main/java/org/apache/iceberg/aws/AwsClientFactory.java index 549dc038e25f..daef0d048090 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/AwsClientFactory.java +++ b/aws/src/main/java/org/apache/iceberg/aws/AwsClientFactory.java @@ -23,7 +23,6 @@ import software.amazon.awssdk.services.dynamodb.DynamoDbClient; import software.amazon.awssdk.services.glue.GlueClient; import software.amazon.awssdk.services.kms.KmsClient; -import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3Client; /** @@ -39,13 +38,6 @@ public interface AwsClientFactory extends Serializable { */ S3Client s3(); - /** - * create a Amazon S3 async client - * - * @return s3 async client - */ - S3AsyncClient s3Async(); - /** * create a AWS Glue client * diff --git a/aws/src/main/java/org/apache/iceberg/aws/AwsClientProperties.java b/aws/src/main/java/org/apache/iceberg/aws/AwsClientProperties.java index cf73e80f44c1..68e42f7015f1 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/AwsClientProperties.java +++ b/aws/src/main/java/org/apache/iceberg/aws/AwsClientProperties.java @@ -39,7 +39,6 @@ import software.amazon.awssdk.core.retry.RetryMode; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.LegacyMd5Plugin; -import software.amazon.awssdk.services.s3.S3CrtAsyncClientBuilder; public class AwsClientProperties implements Serializable { /** @@ -148,21 +147,6 @@ void applyClientRegionConfiguration(BuilderT builder) { } } - /** - * Configure an S3 CRT client region. - * - *

Sample usage: - * - *

-   *     S3AsyncClient.crtBuilder().applyMutation(awsClientProperties::applyClientRegionConfiguration)
-   * 
- */ - public void applyClientRegionConfiguration(T builder) { - if (clientRegion != null) { - builder.region(Region.of(clientRegion)); - } - } - /** * Configure the credential provider for AWS clients. * @@ -179,21 +163,6 @@ void applyClientCredentialConfigurations(BuilderT builder) { } } - /** - * Configure the credential provider for S3 CRT clients. - * - *

Sample usage: - * - *

-   *     S3AsyncClient.crtBuilder().applyMutation(awsClientProperties::applyClientCredentialConfigurations)
-   * 
- */ - public void applyClientCredentialConfigurations(T builder) { - if (!Strings.isNullOrEmpty(this.clientCredentialsProvider)) { - builder.credentialsProvider(credentialsProvider(this.clientCredentialsProvider)); - } - } - /** * Returns a credentials provider instance. If {@link #refreshCredentialsEndpoint} is set, an * instance of {@link VendedCredentialsProvider} is returned. If params were set, we return a new diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/AnalyticsAcceleratorInputStreamWrapper.java b/aws/src/main/java/org/apache/iceberg/aws/s3/AnalyticsAcceleratorInputStreamWrapper.java index 197f79905f0f..e605ba46e3d8 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/AnalyticsAcceleratorInputStreamWrapper.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/AnalyticsAcceleratorInputStreamWrapper.java @@ -19,31 +19,68 @@ package org.apache.iceberg.aws.s3; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.function.IntFunction; +import java.util.stream.Collectors; +import org.apache.iceberg.io.FileIOMetricsContext; +import org.apache.iceberg.io.FileRange; +import org.apache.iceberg.io.RangeReadable; import org.apache.iceberg.io.SeekableInputStream; +import org.apache.iceberg.metrics.Counter; +import org.apache.iceberg.metrics.MetricsContext; import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream; +import software.amazon.s3.analyticsaccelerator.common.ObjectRange; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** A wrapper to convert {@link S3SeekableInputStream} to Iceberg {@link SeekableInputStream} */ -class AnalyticsAcceleratorInputStreamWrapper extends SeekableInputStream { +class AnalyticsAcceleratorInputStreamWrapper extends SeekableInputStream implements RangeReadable { + private static final Logger LOG = + LoggerFactory.getLogger(AnalyticsAcceleratorInputStreamWrapper.class); private final S3SeekableInputStream delegate; + private final Counter readBytes; + private final Counter readOperations; AnalyticsAcceleratorInputStreamWrapper(S3SeekableInputStream stream) { + this(stream, MetricsContext.nullMetrics()); + } + + AnalyticsAcceleratorInputStreamWrapper(S3SeekableInputStream stream, MetricsContext metrics) { this.delegate = stream; + this.readBytes = metrics.counter(FileIOMetricsContext.READ_BYTES, MetricsContext.Unit.BYTES); + this.readOperations = metrics.counter(FileIOMetricsContext.READ_OPERATIONS); } @Override public int read() throws IOException { - return this.delegate.read(); + int nextByteValue = this.delegate.read(); + if (nextByteValue != -1) { + readBytes.increment(); + } + readOperations.increment(); + return nextByteValue; } @Override - public int read(byte[] b) throws IOException { - return this.delegate.read(b, 0, b.length); + public int read(byte[] b, int off, int len) throws IOException { + int bytesRead = this.delegate.read(b, off, len); + if (bytesRead > 0) { + readBytes.increment(bytesRead); + } + readOperations.increment(); + return bytesRead; } @Override - public int read(byte[] b, int off, int len) throws IOException { - return this.delegate.read(b, off, len); + public void readFully(long position, byte[] buffer, int offset, int length) throws IOException { + this.delegate.readFully(position, buffer, offset, length); + } + + @Override + public int readTail(byte[] buffer, int offset, int length) throws IOException { + return this.delegate.readTail(buffer, offset, length); } @Override @@ -56,6 +93,21 @@ public long getPos() { return this.delegate.getPos(); } + @Override + public void readVectored(List ranges, IntFunction allocate) throws IOException { + List objectRanges = ranges.stream() + .map( + range -> + new software.amazon.s3.analyticsaccelerator.common.ObjectRange( + range.byteBuffer(), range.offset(), range.length())) + .collect(Collectors.toList()); + + // This does not keep track of bytes read and that is a possible improvement. + readOperations.increment(); + this.delegate.readVectored(objectRanges, allocate, + buffer -> LOG.info("Release buffer of length {}: {}", buffer.limit(), buffer)); + } + @Override public void close() throws IOException { this.delegate.close(); diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/AnalyticsAcceleratorUtil.java b/aws/src/main/java/org/apache/iceberg/aws/s3/AnalyticsAcceleratorUtil.java index ed1c3a4879e3..b4ab04d3756a 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/AnalyticsAcceleratorUtil.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/AnalyticsAcceleratorUtil.java @@ -27,30 +27,27 @@ import org.apache.iceberg.util.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import software.amazon.awssdk.services.s3.S3AsyncClient; -import software.amazon.awssdk.services.s3.model.HeadObjectResponse; +import software.amazon.awssdk.services.s3.S3Client; import software.amazon.s3.analyticsaccelerator.ObjectClientConfiguration; -import software.amazon.s3.analyticsaccelerator.S3SdkObjectClient; import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream; import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration; import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory; +import software.amazon.s3.analyticsaccelerator.S3SyncSdkObjectClient; import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration; import software.amazon.s3.analyticsaccelerator.request.ObjectClient; -import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata; -import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation; import software.amazon.s3.analyticsaccelerator.util.S3URI; class AnalyticsAcceleratorUtil { private static final Logger LOG = LoggerFactory.getLogger(AnalyticsAcceleratorUtil.class); - private static final Cache, S3SeekableInputStreamFactory> + private static final Cache, S3SeekableInputStreamFactory> STREAM_FACTORY_CACHE = Caffeine.newBuilder() .maximumSize(100) .removalListener( (RemovalListener< - Pair, S3SeekableInputStreamFactory>) + Pair, S3SeekableInputStreamFactory>) (key, factory, cause) -> close(factory)) .build(); @@ -58,23 +55,14 @@ private AnalyticsAcceleratorUtil() {} public static SeekableInputStream newStream(S3InputFile inputFile) { S3URI uri = S3URI.of(inputFile.uri().bucket(), inputFile.uri().key()); - HeadObjectResponse metadata = inputFile.getObjectMetadata(); - OpenStreamInformation openStreamInfo = - OpenStreamInformation.builder() - .objectMetadata( - ObjectMetadata.builder() - .contentLength(metadata.contentLength()) - .etag(metadata.eTag()) - .build()) - .build(); S3SeekableInputStreamFactory factory = STREAM_FACTORY_CACHE.get( - Pair.of(inputFile.asyncClient(), inputFile.s3FileIOProperties()), + Pair.of(inputFile.client(), inputFile.s3FileIOProperties()), AnalyticsAcceleratorUtil::createNewFactory); try { - S3SeekableInputStream seekableInputStream = factory.createStream(uri, openStreamInfo); + S3SeekableInputStream seekableInputStream = factory.createStream(uri); return new AnalyticsAcceleratorInputStreamWrapper(seekableInputStream); } catch (IOException e) { throw new RuntimeIOException( @@ -83,15 +71,15 @@ public static SeekableInputStream newStream(S3InputFile inputFile) { } private static S3SeekableInputStreamFactory createNewFactory( - Pair cacheKey) { + Pair cacheKey) { ConnectorConfiguration connectorConfiguration = new ConnectorConfiguration(cacheKey.second().s3AnalyticsacceleratorProperties()); S3SeekableInputStreamConfiguration streamConfiguration = S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration); ObjectClientConfiguration objectClientConfiguration = ObjectClientConfiguration.fromConfiguration(connectorConfiguration); - - ObjectClient objectClient = new S3SdkObjectClient(cacheKey.first(), objectClientConfiguration); + ObjectClient objectClient = + new S3SyncSdkObjectClient(cacheKey.first(), objectClientConfiguration); return new S3SeekableInputStreamFactory(objectClient, streamConfiguration); } @@ -105,8 +93,7 @@ private static void close(S3SeekableInputStreamFactory factory) { } } - public static void cleanupCache( - S3AsyncClient asyncClient, S3FileIOProperties s3FileIOProperties) { - STREAM_FACTORY_CACHE.invalidate(Pair.of(asyncClient, s3FileIOProperties)); + public static void cleanupCache(S3Client client, S3FileIOProperties s3FileIOProperties) { + STREAM_FACTORY_CACHE.invalidate(Pair.of(client, s3FileIOProperties)); } } diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/BaseS3File.java b/aws/src/main/java/org/apache/iceberg/aws/s3/BaseS3File.java index 2ea7651fc847..88b212a37bec 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/BaseS3File.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/BaseS3File.java @@ -20,7 +20,6 @@ import org.apache.iceberg.metrics.MetricsContext; import software.amazon.awssdk.http.HttpStatusCode; -import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.HeadObjectRequest; import software.amazon.awssdk.services.s3.model.HeadObjectResponse; @@ -28,20 +27,14 @@ abstract class BaseS3File { private final S3Client client; - private final S3AsyncClient asyncClient; private final S3URI uri; private final S3FileIOProperties s3FileIOProperties; private HeadObjectResponse metadata; private final MetricsContext metrics; BaseS3File( - S3Client client, - S3AsyncClient asyncClient, - S3URI uri, - S3FileIOProperties s3FileIOProperties, - MetricsContext metrics) { + S3Client client, S3URI uri, S3FileIOProperties s3FileIOProperties, MetricsContext metrics) { this.client = client; - this.asyncClient = asyncClient; this.uri = uri; this.s3FileIOProperties = s3FileIOProperties; this.metrics = metrics; @@ -55,10 +48,6 @@ S3Client client() { return client; } - S3AsyncClient asyncClient() { - return asyncClient; - } - S3URI uri() { return uri; } diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/DefaultS3FileIOAwsClientFactory.java b/aws/src/main/java/org/apache/iceberg/aws/s3/DefaultS3FileIOAwsClientFactory.java index 2dec40e7f897..ee2d344cea2b 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/DefaultS3FileIOAwsClientFactory.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/DefaultS3FileIOAwsClientFactory.java @@ -21,7 +21,6 @@ import java.util.Map; import org.apache.iceberg.aws.AwsClientProperties; import org.apache.iceberg.aws.HttpClientProperties; -import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3Client; class DefaultS3FileIOAwsClientFactory implements S3FileIOAwsClientFactory { @@ -60,22 +59,4 @@ public S3Client s3() { .applyMutation(s3FileIOProperties::applyRetryConfigurations) .build(); } - - @Override - public S3AsyncClient s3Async() { - if (s3FileIOProperties.isS3CRTEnabled()) { - return S3AsyncClient.crtBuilder() - .applyMutation(awsClientProperties::applyClientRegionConfiguration) - .applyMutation(awsClientProperties::applyClientCredentialConfigurations) - .applyMutation(s3FileIOProperties::applyEndpointConfigurations) - .applyMutation(s3FileIOProperties::applyS3CrtConfigurations) - .build(); - } - return S3AsyncClient.builder() - .applyMutation(awsClientProperties::applyClientRegionConfiguration) - .applyMutation(awsClientProperties::applyClientCredentialConfigurations) - .applyMutation(awsClientProperties::applyLegacyMd5Plugin) - .applyMutation(s3FileIOProperties::applyEndpointConfigurations) - .build(); - } } diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/PrefixedS3Client.java b/aws/src/main/java/org/apache/iceberg/aws/s3/PrefixedS3Client.java index 400792cf976c..946962594047 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/PrefixedS3Client.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/PrefixedS3Client.java @@ -24,7 +24,6 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.base.Strings; import org.apache.iceberg.util.SerializableSupplier; -import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3Client; class PrefixedS3Client implements AutoCloseable { @@ -32,21 +31,15 @@ class PrefixedS3Client implements AutoCloseable { private final String storagePrefix; private final S3FileIOProperties s3FileIOProperties; private SerializableSupplier s3; - private SerializableSupplier s3Async; private transient volatile S3Client s3Client; - private transient volatile S3AsyncClient s3AsyncClient; PrefixedS3Client( - String storagePrefix, - Map properties, - SerializableSupplier s3, - SerializableSupplier s3Async) { + String storagePrefix, Map properties, SerializableSupplier s3) { Preconditions.checkArgument( !Strings.isNullOrEmpty(storagePrefix), "Invalid storage prefix: null or empty"); Preconditions.checkArgument(null != properties, "Invalid properties: null"); this.storagePrefix = storagePrefix; this.s3 = s3; - this.s3Async = s3Async; this.s3FileIOProperties = new S3FileIOProperties(properties); // Do not override s3 client if it was provided if (s3 == null) { @@ -61,17 +54,6 @@ class PrefixedS3Client implements AutoCloseable { s3(); } } - - // Do not override s3Async client if it was provided - if (s3Async == null) { - Object clientFactory = S3FileIOAwsClientFactories.initialize(properties); - if (clientFactory instanceof S3FileIOAwsClientFactory) { - this.s3Async = ((S3FileIOAwsClientFactory) clientFactory)::s3Async; - } - if (clientFactory instanceof AwsClientFactory) { - this.s3Async = ((AwsClientFactory) clientFactory)::s3Async; - } - } } public String storagePrefix() { @@ -90,18 +72,6 @@ public S3Client s3() { return s3Client; } - public S3AsyncClient s3Async() { - if (s3AsyncClient == null) { - synchronized (this) { - if (s3AsyncClient == null) { - s3AsyncClient = s3Async.get(); - } - } - } - - return s3AsyncClient; - } - public S3FileIOProperties s3FileIOProperties() { return s3FileIOProperties; } @@ -109,15 +79,11 @@ public S3FileIOProperties s3FileIOProperties() { @Override public void close() { if (null != s3Client) { - s3Client.close(); - } - - if (null != s3AsyncClient) { // cleanup usage in analytics accelerator if any if (s3FileIOProperties().isS3AnalyticsAcceleratorEnabled()) { - AnalyticsAcceleratorUtil.cleanupCache(s3AsyncClient, s3FileIOProperties); + AnalyticsAcceleratorUtil.cleanupCache(s3Client, s3FileIOProperties); } - s3AsyncClient.close(); + s3Client.close(); } } } diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java index d5e51ed74ad6..0f956146c2f9 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java @@ -61,7 +61,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.core.exception.SdkException; -import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.Delete; import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; @@ -98,7 +97,6 @@ public class S3FileIO private String credential = null; private SerializableSupplier s3; - private SerializableSupplier s3Async; private SerializableMap properties = null; private MetricsContext metrics = MetricsContext.nullMetrics(); private final AtomicBoolean isResourceClosed = new AtomicBoolean(false); @@ -122,20 +120,7 @@ public S3FileIO() {} * @param s3 s3 supplier */ public S3FileIO(SerializableSupplier s3) { - this(s3, null); - } - - /** - * Constructor with custom s3 supplier and s3Async supplier. - * - *

Calling {@link S3FileIO#initialize(Map)} will overwrite information set in this constructor. - * - * @param s3 s3 supplier - * @param s3Async s3Async supplier - */ - public S3FileIO(SerializableSupplier s3, SerializableSupplier s3Async) { this.s3 = s3; - this.s3Async = s3Async; this.createStack = Thread.currentThread().getStackTrace(); this.properties = SerializableMap.copyOf(Maps.newHashMap()); } @@ -367,15 +352,6 @@ public S3Client client(String storagePath) { return clientForStoragePath(storagePath).s3(); } - public S3AsyncClient asyncClient() { - return asyncClient(ROOT_PREFIX); - } - - @SuppressWarnings("resource") - public S3AsyncClient asyncClient(String storagePath) { - return clientForStoragePath(storagePath).s3Async(); - } - @VisibleForTesting PrefixedS3Client clientForStoragePath(String storagePath) { PrefixedS3Client client; @@ -401,8 +377,7 @@ private Map clientByPrefix() { if (null == clientByPrefix) { Map localClientByPrefix = Maps.newHashMap(); - localClientByPrefix.put( - ROOT_PREFIX, new PrefixedS3Client(ROOT_PREFIX, properties, s3, s3Async)); + localClientByPrefix.put(ROOT_PREFIX, new PrefixedS3Client(ROOT_PREFIX, properties, s3)); storageCredentials.stream() .filter(c -> c.prefix().startsWith(ROOT_PREFIX)) .collect(Collectors.toList()) @@ -417,7 +392,7 @@ private Map clientByPrefix() { localClientByPrefix.put( storageCredential.prefix(), new PrefixedS3Client( - storageCredential.prefix(), propertiesWithCredentials, s3, s3Async)); + storageCredential.prefix(), propertiesWithCredentials, s3)); }); this.clientByPrefix = localClientByPrefix; } diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOAwsClientFactory.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOAwsClientFactory.java index 4b2d5ccb1504..718298818a30 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOAwsClientFactory.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOAwsClientFactory.java @@ -20,7 +20,6 @@ import java.io.Serializable; import java.util.Map; -import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3Client; public interface S3FileIOAwsClientFactory extends Serializable { @@ -31,13 +30,6 @@ public interface S3FileIOAwsClientFactory extends Serializable { */ S3Client s3(); - /** - * create a Amazon S3 async client - * - * @return s3 async client - */ - S3AsyncClient s3Async(); - /** * Initialize AWS client factory from catalog properties. * diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java index 6bf582f00bbb..4157b3d34952 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java @@ -1021,13 +1021,12 @@ public void applySignerConfiguration(T builder) { } /** - * Override the endpoint for an S3 sync or async client. + * Override the endpoint for an S3 sync client. * *

Sample usage: * *

    *     S3Client.builder().applyMutation(s3FileIOProperties::applyEndpointConfigurations)
-   *     S3AsyncClient.builder().applyMutation(s3FileIOProperties::applyEndpointConfigurations)
    * 
*/ public > void applyEndpointConfigurations(T builder) { @@ -1036,21 +1035,6 @@ public void applySignerConfiguration(T builder) { } } - /** - * Override the endpoint for an S3 CRT client - * - *

Sample usage: - * - *

-   *     S3AsyncClient.crtBuilder().applyMutation(s3FileIOProperties::applyEndpointConfigurations)
-   * 
- */ - public void applyEndpointConfigurations(T builder) { - if (endpoint != null) { - builder.endpointOverride(URI.create(endpoint)); - } - } - /** * Override the retry configurations for an S3 client. * diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputFile.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputFile.java index 5e4346fe9f9f..a11925d72ba9 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputFile.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputFile.java @@ -23,7 +23,6 @@ import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.SeekableInputStream; import org.apache.iceberg.metrics.MetricsContext; -import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3Client; public class S3InputFile extends BaseS3File implements InputFile, NativelyEncryptedFile { @@ -39,7 +38,6 @@ static S3InputFile fromLocation( String location, long length, PrefixedS3Client client, MetricsContext metrics) { return new S3InputFile( client.s3(), - client.s3FileIOProperties().isS3AnalyticsAcceleratorEnabled() ? client.s3Async() : null, new S3URI(location, client.s3FileIOProperties().bucketToAccessPointMapping()), length > 0 ? length : null, client.s3FileIOProperties(), @@ -48,12 +46,11 @@ static S3InputFile fromLocation( S3InputFile( S3Client client, - S3AsyncClient asyncClient, S3URI uri, Long length, S3FileIOProperties s3FileIOProperties, MetricsContext metrics) { - super(client, asyncClient, uri, s3FileIOProperties, metrics); + super(client, uri, s3FileIOProperties, metrics); this.length = length; } diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputFile.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputFile.java index 3fcd3cdbd5eb..1ca30ec6bf39 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputFile.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputFile.java @@ -27,7 +27,6 @@ import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.io.PositionOutputStream; import org.apache.iceberg.metrics.MetricsContext; -import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3Client; public class S3OutputFile extends BaseS3File implements OutputFile, NativelyEncryptedFile { @@ -37,19 +36,14 @@ static S3OutputFile fromLocation( String location, PrefixedS3Client client, MetricsContext metrics) { return new S3OutputFile( client.s3(), - client.s3FileIOProperties().isS3AnalyticsAcceleratorEnabled() ? client.s3Async() : null, new S3URI(location, client.s3FileIOProperties().bucketToAccessPointMapping()), client.s3FileIOProperties(), metrics); } S3OutputFile( - S3Client client, - S3AsyncClient asyncClient, - S3URI uri, - S3FileIOProperties s3FileIOProperties, - MetricsContext metrics) { - super(client, asyncClient, uri, s3FileIOProperties, metrics); + S3Client client, S3URI uri, S3FileIOProperties s3FileIOProperties, MetricsContext metrics) { + super(client, uri, s3FileIOProperties, metrics); } /** @@ -78,7 +72,7 @@ public PositionOutputStream createOrOverwrite() { @Override public InputFile toInputFile() { - return new S3InputFile(client(), asyncClient(), uri(), null, s3FileIOProperties(), metrics()); + return new S3InputFile(client(), uri(), null, s3FileIOProperties(), metrics()); } @Override diff --git a/aws/src/test/java/org/apache/iceberg/aws/TestAwsClientFactories.java b/aws/src/test/java/org/apache/iceberg/aws/TestAwsClientFactories.java index fe95f9364673..1a7bbe06fc85 100644 --- a/aws/src/test/java/org/apache/iceberg/aws/TestAwsClientFactories.java +++ b/aws/src/test/java/org/apache/iceberg/aws/TestAwsClientFactories.java @@ -28,7 +28,6 @@ import org.apache.iceberg.aws.s3.S3FileIOProperties; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.util.SerializationUtil; import org.assertj.core.api.ThrowableAssert; @@ -43,9 +42,7 @@ import software.amazon.awssdk.services.glue.GlueClient; import software.amazon.awssdk.services.glue.model.GetTablesRequest; import software.amazon.awssdk.services.kms.KmsClient; -import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3Client; -import software.amazon.awssdk.services.s3.internal.crt.DefaultS3CrtAsyncClient; public class TestAwsClientFactories { @@ -60,55 +57,6 @@ public void testLoadDefault() { .isInstanceOf(AwsClientFactories.DefaultAwsClientFactory.class); } - @Test - public void testS3AsyncClientCrtEnabled() { - assertThat( - AwsClientFactories.from( - ImmutableMap.of( - S3FileIOProperties.ACCESS_KEY_ID, - "keyId", - S3FileIOProperties.SECRET_ACCESS_KEY, - "accessKey", - S3FileIOProperties.S3_CRT_ENABLED, - "true", - AwsClientProperties.CLIENT_REGION, - "us-east-1")) - .s3Async()) - .isInstanceOf(DefaultS3CrtAsyncClient.class); - } - - @Test - public void testS3AsyncClientWithCrtDisabled() { - assertThat( - AwsClientFactories.from( - ImmutableMap.of( - S3FileIOProperties.ACCESS_KEY_ID, - "keyId", - S3FileIOProperties.SECRET_ACCESS_KEY, - "accessKey", - S3FileIOProperties.S3_CRT_ENABLED, - "false", - AwsClientProperties.CLIENT_REGION, - "us-east-1")) - .s3Async()) - .isNotInstanceOf(DefaultS3CrtAsyncClient.class); - } - - @Test - public void testS3AsyncClientDefaultIsCrt() { - assertThat( - AwsClientFactories.from( - ImmutableMap.of( - S3FileIOProperties.ACCESS_KEY_ID, - "keyId", - S3FileIOProperties.SECRET_ACCESS_KEY, - "accessKey", - AwsClientProperties.CLIENT_REGION, - "us-east-1")) - .s3Async()) - .isInstanceOf(DefaultS3CrtAsyncClient.class); - } - @Test public void testLoadCustom() { Map properties = Maps.newHashMap(); @@ -352,11 +300,6 @@ public S3Client s3() { return null; } - @Override - public S3AsyncClient s3Async() { - return null; - } - @Override public GlueClient glue() { return null; diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/StaticClientFactory.java b/aws/src/test/java/org/apache/iceberg/aws/s3/StaticClientFactory.java index 76f47adb17e8..6a2edcf01ed2 100644 --- a/aws/src/test/java/org/apache/iceberg/aws/s3/StaticClientFactory.java +++ b/aws/src/test/java/org/apache/iceberg/aws/s3/StaticClientFactory.java @@ -23,7 +23,6 @@ import software.amazon.awssdk.services.dynamodb.DynamoDbClient; import software.amazon.awssdk.services.glue.GlueClient; import software.amazon.awssdk.services.kms.KmsClient; -import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3Client; class StaticClientFactory implements AwsClientFactory { @@ -34,11 +33,6 @@ public S3Client s3() { return client; } - @Override - public S3AsyncClient s3Async() { - return null; - } - @Override public GlueClient glue() { return null; diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/TestPrefixedS3Client.java b/aws/src/test/java/org/apache/iceberg/aws/s3/TestPrefixedS3Client.java index 18be1c915bea..8f4838e6d6c4 100644 --- a/aws/src/test/java/org/apache/iceberg/aws/s3/TestPrefixedS3Client.java +++ b/aws/src/test/java/org/apache/iceberg/aws/s3/TestPrefixedS3Client.java @@ -25,7 +25,6 @@ import org.apache.iceberg.aws.AwsClientProperties; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.junit.jupiter.api.Test; -import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3Client; @SuppressWarnings("resource") @@ -33,15 +32,15 @@ public class TestPrefixedS3Client { @Test public void invalidParameters() { - assertThatThrownBy(() -> new PrefixedS3Client(null, null, null, null)) + assertThatThrownBy(() -> new PrefixedS3Client(null, null, null)) .isInstanceOf(IllegalArgumentException.class) .hasMessage("Invalid storage prefix: null or empty"); - assertThatThrownBy(() -> new PrefixedS3Client("", null, null, null)) + assertThatThrownBy(() -> new PrefixedS3Client("", null, null)) .isInstanceOf(IllegalArgumentException.class) .hasMessage("Invalid storage prefix: null or empty"); - assertThatThrownBy(() -> new PrefixedS3Client("s3://bucket", null, null, null)) + assertThatThrownBy(() -> new PrefixedS3Client("s3://bucket", null, null)) .isInstanceOf(IllegalArgumentException.class) .hasMessage("Invalid properties: null"); } @@ -50,11 +49,10 @@ public void invalidParameters() { public void validParameters() { Map properties = ImmutableMap.of(AwsClientProperties.CLIENT_REGION, "us-east-1"); - PrefixedS3Client client = new PrefixedS3Client("s3", properties, null, null); + PrefixedS3Client client = new PrefixedS3Client("s3", properties, null); assertThat(client.storagePrefix()).isEqualTo("s3"); assertThat(client.s3FileIOProperties().properties()) .isEqualTo(new S3FileIOProperties(properties).properties()); assertThat(client.s3()).isInstanceOf(S3Client.class); - assertThat(client.s3Async()).isInstanceOf(S3AsyncClient.class); } } diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIOProperties.java b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIOProperties.java index 1666de1f1d08..b0d0c77db4e3 100644 --- a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIOProperties.java +++ b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIOProperties.java @@ -37,7 +37,6 @@ import org.mockito.Mockito; import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; import software.amazon.awssdk.core.retry.RetryPolicy; -import software.amazon.awssdk.services.s3.S3AsyncClientBuilder; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.S3ClientBuilder; import software.amazon.awssdk.services.s3.S3Configuration; @@ -519,17 +518,12 @@ public void testApplyEndpointConfiguration() { properties.put(S3FileIOProperties.ENDPOINT, "endpoint"); S3FileIOProperties s3FileIOProperties = new S3FileIOProperties(properties); S3ClientBuilder mockS3ClientBuilder = Mockito.mock(S3ClientBuilder.class); - S3AsyncClientBuilder mockS3AsyncClientBuilder = Mockito.mock(S3AsyncClientBuilder.class); S3CrtAsyncClientBuilder mockS3CrtAsyncClientBuilder = Mockito.mock(S3CrtAsyncClientBuilder.class); s3FileIOProperties.applyEndpointConfigurations(mockS3ClientBuilder); - s3FileIOProperties.applyEndpointConfigurations(mockS3AsyncClientBuilder); - s3FileIOProperties.applyEndpointConfigurations(mockS3CrtAsyncClientBuilder); Mockito.verify(mockS3ClientBuilder).endpointOverride(Mockito.any(URI.class)); - Mockito.verify(mockS3AsyncClientBuilder).endpointOverride(Mockito.any(URI.class)); - Mockito.verify(mockS3CrtAsyncClientBuilder).endpointOverride(Mockito.any(URI.class)); } @Test