diff --git a/aws-bundle/build.gradle b/aws-bundle/build.gradle index 9e937aa0edb7..e988ac249dc0 100644 --- a/aws-bundle/build.gradle +++ b/aws-bundle/build.gradle @@ -27,6 +27,7 @@ project(":iceberg-aws-bundle") { implementation platform(libs.awssdk.bom) implementation libs.awssdk.s3accessgrants implementation "software.amazon.awssdk:apache-client" + implementation "software.amazon.awssdk:netty-nio-client" implementation "software.amazon.awssdk:auth" implementation "software.amazon.awssdk:crt-core" implementation "software.amazon.awssdk:http-auth-aws-crt" diff --git a/aws/src/integration/java/org/apache/iceberg/aws/glue/TestGlueCatalogCommitFailure.java b/aws/src/integration/java/org/apache/iceberg/aws/glue/TestGlueCatalogCommitFailure.java index 177a627efcab..f0580a181c49 100644 --- a/aws/src/integration/java/org/apache/iceberg/aws/glue/TestGlueCatalogCommitFailure.java +++ b/aws/src/integration/java/org/apache/iceberg/aws/glue/TestGlueCatalogCommitFailure.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.aws.glue; +import static org.apache.iceberg.aws.s3.S3TestUtil.skipIfAnalyticsAcceleratorEnabled; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -28,6 +29,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.aws.AwsIntegTestUtil; +import org.apache.iceberg.aws.s3.S3FileIOProperties; import org.apache.iceberg.aws.s3.S3TestUtil; import org.apache.iceberg.aws.util.RetryDetector; import org.apache.iceberg.catalog.TableIdentifier; @@ -39,6 +41,8 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable; import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariables; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; import org.mockito.Mockito; import software.amazon.awssdk.core.metrics.CoreMetric; import software.amazon.awssdk.metrics.MetricCollector; @@ -156,10 +160,14 @@ public void testCheckCommitStatusAfterRetries() { .isEqualTo(2); } - @Test - public void testNoRetryAwarenessCorruptsTable() { + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testNoRetryAwarenessCorruptsTable(Map aalProperties) { // This test exists to replicate the issue the prior test validates the fix for // See https://github.com/apache/iceberg/issues/7151 + skipIfAnalyticsAcceleratorEnabled( + new S3FileIOProperties(aalProperties), + "Analytics Accelerator Library does not support custom Iceberg exception: NotFoundException"); String namespace = createNamespace(); String tableName = createTable(namespace); TableIdentifier tableId = TableIdentifier.of(namespace, tableName); diff --git a/aws/src/integration/java/org/apache/iceberg/aws/s3/MinioUtil.java b/aws/src/integration/java/org/apache/iceberg/aws/s3/MinioUtil.java index 90557720fa0e..17dc42749fdc 100644 --- a/aws/src/integration/java/org/apache/iceberg/aws/s3/MinioUtil.java +++ b/aws/src/integration/java/org/apache/iceberg/aws/s3/MinioUtil.java @@ -26,6 +26,8 @@ import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.LegacyMd5Plugin; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.S3AsyncClientBuilder; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.S3ClientBuilder; @@ -73,4 +75,16 @@ public static S3Client createS3Client(MinIOContainer container, boolean legacyMd builder.forcePathStyle(true); // OSX won't resolve subdomains return builder.build(); } + + public static S3AsyncClient createS3AsyncClient(MinIOContainer container) { + URI uri = URI.create(container.getS3URL()); + S3AsyncClientBuilder builder = S3AsyncClient.builder(); + builder.credentialsProvider( + StaticCredentialsProvider.create( + AwsBasicCredentials.create(container.getUserName(), container.getPassword()))); + builder.applyMutation(mutator -> mutator.endpointOverride(uri)); + builder.region(Region.US_EAST_1); + builder.forcePathStyle(true); // OSX won't resolve subdomains + return builder.build(); + } } diff --git a/aws/src/integration/java/org/apache/iceberg/aws/s3/S3TestUtil.java b/aws/src/integration/java/org/apache/iceberg/aws/s3/S3TestUtil.java index 8700da07c449..64b93e6454ee 100644 --- a/aws/src/integration/java/org/apache/iceberg/aws/s3/S3TestUtil.java +++ b/aws/src/integration/java/org/apache/iceberg/aws/s3/S3TestUtil.java @@ -18,8 +18,20 @@ */ package org.apache.iceberg.aws.s3; +import static org.assertj.core.api.Assumptions.assumeThat; + +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.junit.jupiter.params.provider.Arguments; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class S3TestUtil { + private static final Logger LOG = LoggerFactory.getLogger(S3TestUtil.class); + private S3TestUtil() {} public static String getBucketFromUri(String s3Uri) { @@ -29,4 +41,38 @@ public static String getBucketFromUri(String s3Uri) { public static String getKeyFromUri(String s3Uri) { return new S3URI(s3Uri).key(); } + + /** + * Skip a test if the Analytics Accelerator Library for Amazon S3 is enabled. + * + * @param properties properties to probe + */ + public static void skipIfAnalyticsAcceleratorEnabled( + S3FileIOProperties properties, String message) { + boolean isAcceleratorEnabled = properties.isS3AnalyticsAcceleratorEnabled(); + if (isAcceleratorEnabled) { + LOG.warn(message); + } + assumeThat(!isAcceleratorEnabled).describedAs(message).isTrue(); + } + + public static Stream analyticsAcceleratorLibraryProperties() { + return listAnalyticsAcceleratorLibraryProperties().stream().map(Arguments::of); + } + + public static List> listAnalyticsAcceleratorLibraryProperties() { + return List.of( + ImmutableMap.of( + S3FileIOProperties.S3_ANALYTICS_ACCELERATOR_ENABLED, Boolean.toString(true)), + ImmutableMap.of( + S3FileIOProperties.S3_ANALYTICS_ACCELERATOR_ENABLED, Boolean.toString(false))); + } + + public static Map mergeProperties( + Map aalProperties, Map testProperties) { + return ImmutableMap.builder() + .putAll(aalProperties) + .putAll(testProperties) + .build(); + } } diff --git a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestFlakyS3InputStream.java b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestFlakyS3InputStream.java index f98d1a3d4471..e4de9df4854b 100644 --- a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestFlakyS3InputStream.java +++ b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestFlakyS3InputStream.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.aws.s3; +import static org.apache.iceberg.aws.s3.S3TestUtil.listAnalyticsAcceleratorLibraryProperties; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; @@ -26,20 +27,33 @@ import java.io.IOException; import java.io.InputStream; +import java.net.SocketException; import java.net.SocketTimeoutException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; import javax.net.ssl.SSLException; +import org.apache.iceberg.io.SeekableInputStream; +import org.apache.iceberg.metrics.MetricsContext; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import software.amazon.awssdk.awscore.exception.AwsServiceException; import software.amazon.awssdk.core.ResponseInputStream; +import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.core.async.AsyncResponseTransformer; import software.amazon.awssdk.core.exception.SdkClientException; import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.core.sync.ResponseTransformer; +import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.S3ServiceClientConfiguration; import software.amazon.awssdk.services.s3.model.CreateBucketRequest; import software.amazon.awssdk.services.s3.model.CreateBucketResponse; import software.amazon.awssdk.services.s3.model.GetObjectRequest; @@ -48,19 +62,49 @@ import software.amazon.awssdk.services.s3.model.HeadObjectResponse; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.model.PutObjectResponse; +import software.amazon.s3.analyticsaccelerator.util.retry.DefaultRetryStrategyImpl; +import software.amazon.s3.analyticsaccelerator.util.retry.RetryPolicy; +import software.amazon.s3.analyticsaccelerator.util.retry.RetryStrategy; public class TestFlakyS3InputStream extends TestS3InputStream { private AtomicInteger resetForRetryCounter; + private RetryStrategy retryStrategy; + private static final List> RETRYABLE_EXCEPTIONS = + ImmutableList.of(SSLException.class, SocketTimeoutException.class, SocketException.class); + private static final int MAX_RETRIES = 3; @BeforeEach public void setupTest() { resetForRetryCounter = new AtomicInteger(0); + retryStrategy = + new DefaultRetryStrategyImpl( + RetryPolicy.builder() + .handle(RETRYABLE_EXCEPTIONS) + .withMaxRetries(MAX_RETRIES) + .onRetry(this::reset) + .build()); + } + + private void reset() { + resetForRetryCounter.incrementAndGet(); } @Override - S3InputStream newInputStream(S3Client s3Client, S3URI uri) { - return new S3InputStream(s3Client, uri) { + SeekableInputStream newInputStream( + S3Client s3Client, + S3AsyncClient s3AsyncClient, + S3URI uri, + Map aalProperties, + MetricsContext metricsContext) { + final S3FileIOProperties s3FileIOProperties = new S3FileIOProperties(aalProperties); + if (s3FileIOProperties.isS3AnalyticsAcceleratorEnabled()) { + PrefixedS3Client client = + new PrefixedS3Client("s3", aalProperties, () -> s3Client, () -> s3AsyncClient); + return AnalyticsAcceleratorUtil.newStream( + S3InputFile.fromLocation(uri.location(), client, metricsContext), retryStrategy); + } + return new S3InputStream(s3Client, uri, s3FileIOProperties, metricsContext) { @Override void resetForRetry() throws IOException { resetForRetryCounter.incrementAndGet(); @@ -70,64 +114,114 @@ void resetForRetry() throws IOException { } @ParameterizedTest - @MethodSource("retryableExceptions") - public void testReadWithFlakyStreamRetrySucceed(IOException exception) throws Exception { - testRead(flakyStreamClient(new AtomicInteger(3), exception)); + @MethodSource("aalPropertiesAndRetryableExceptions") + public void testReadWithFlakyStreamRetrySucceed( + Map aalProperties, IOException exception) throws Exception { + testRead( + flakyStreamClient(new AtomicInteger(3), exception), + flakyStreamAsyncClient(new AtomicInteger(3), exception), + aalProperties); assertThat(resetForRetryCounter.get()).isEqualTo(2); } @ParameterizedTest - @MethodSource("retryableExceptions") - public void testReadWithFlakyStreamExhaustedRetries(IOException exception) { - assertThatThrownBy(() -> testRead(flakyStreamClient(new AtomicInteger(5), exception))) + @MethodSource("aalPropertiesAndRetryableExceptions") + public void testReadWithFlakyStreamExhaustedRetries( + Map aalProperties, IOException exception) { + assertThatThrownBy( + () -> + testRead( + flakyStreamClient(new AtomicInteger(5), exception), + flakyStreamAsyncClient(new AtomicInteger(5), exception), + aalProperties)) .isInstanceOf(exception.getClass()) .hasMessage(exception.getMessage()); assertThat(resetForRetryCounter.get()).isEqualTo(3); } @ParameterizedTest - @MethodSource("nonRetryableExceptions") - public void testReadWithFlakyStreamNonRetryableException(IOException exception) { - assertThatThrownBy(() -> testRead(flakyStreamClient(new AtomicInteger(3), exception))) + @MethodSource("aalPropertiesAndNonRetryableExceptions") + public void testReadWithFlakyStreamNonRetryableException( + Map aalProperties, IOException exception) { + assertThatThrownBy( + () -> + testRead( + flakyStreamClient(new AtomicInteger(3), exception), + flakyStreamAsyncClient(new AtomicInteger(3), exception), + aalProperties)) .isInstanceOf(exception.getClass()) .hasMessage(exception.getMessage()); assertThat(resetForRetryCounter.get()).isEqualTo(0); } @ParameterizedTest - @MethodSource("retryableExceptions") - public void testSeekWithFlakyStreamRetrySucceed(IOException exception) throws Exception { - testSeek(flakyStreamClient(new AtomicInteger(3), exception)); + @MethodSource("aalPropertiesAndRetryableExceptions") + public void testSeekWithFlakyStreamRetrySucceed( + Map aalProperties, IOException exception) throws Exception { + testSeek( + flakyStreamClient(new AtomicInteger(3), exception), + flakyStreamAsyncClient(new AtomicInteger(3), exception), + aalProperties); assertThat(resetForRetryCounter.get()).isEqualTo(2); } @ParameterizedTest - @MethodSource("retryableExceptions") - public void testSeekWithFlakyStreamExhaustedRetries(IOException exception) { - assertThatThrownBy(() -> testSeek(flakyStreamClient(new AtomicInteger(5), exception))) + @MethodSource("aalPropertiesAndRetryableExceptions") + public void testSeekWithFlakyStreamExhaustedRetries( + Map aalProperties, IOException exception) { + assertThatThrownBy( + () -> + testSeek( + flakyStreamClient(new AtomicInteger(5), exception), + flakyStreamAsyncClient(new AtomicInteger(5), exception), + aalProperties)) .isInstanceOf(exception.getClass()) .hasMessage(exception.getMessage()); assertThat(resetForRetryCounter.get()).isEqualTo(3); } @ParameterizedTest - @MethodSource("nonRetryableExceptions") - public void testSeekWithFlakyStreamNonRetryableException(IOException exception) { - assertThatThrownBy(() -> testSeek(flakyStreamClient(new AtomicInteger(3), exception))) + @MethodSource("aalPropertiesAndNonRetryableExceptions") + public void testSeekWithFlakyStreamNonRetryableException( + Map aalProperties, IOException exception) { + assertThatThrownBy( + () -> + testSeek( + flakyStreamClient(new AtomicInteger(3), exception), + flakyStreamAsyncClient(new AtomicInteger(3), exception), + aalProperties)) .isInstanceOf(exception.getClass()) .hasMessage(exception.getMessage()); assertThat(resetForRetryCounter.get()).isEqualTo(0); } - private static Stream retryableExceptions() { - return Stream.of( - Arguments.of( - new SocketTimeoutException("socket timeout exception"), - new SSLException("some ssl exception"))); + private static Stream aalPropertiesAndRetryableExceptions() { + return aalPropertiesAndExceptions(true); } - private static Stream nonRetryableExceptions() { - return Stream.of(Arguments.of(new IOException("some generic non-retryable IO exception"))); + private static Stream aalPropertiesAndNonRetryableExceptions() { + return aalPropertiesAndExceptions(false); + } + + private static Stream aalPropertiesAndExceptions(boolean retryable) { + List exceptions = retryable ? retryableExceptions() : nonRetryableExceptions(); + List results = Lists.newArrayList(); + for (Map aalProperties : listAnalyticsAcceleratorLibraryProperties()) { + for (IOException exception : exceptions) { + results.add(Arguments.of(aalProperties, exception)); + } + } + return results.stream(); + } + + private static List retryableExceptions() { + return Arrays.asList( + new SocketTimeoutException("socket timeout exception"), + new SSLException("some ssl exception")); + } + + private static List nonRetryableExceptions() { + return Arrays.asList(new IOException("some generic non-retryable IO exception")); } private S3ClientWrapper flakyStreamClient(AtomicInteger counter, IOException failure) { @@ -138,6 +232,29 @@ private S3ClientWrapper flakyStreamClient(AtomicInteger counter, IOException fai return flakyClient; } + private S3AsyncClientWrapper flakyStreamAsyncClient(AtomicInteger counter, IOException failure) { + S3AsyncClientWrapper flakyClient = spy(new S3AsyncClientWrapper(s3AsyncClient())); + doAnswer( + invocation -> + CompletableFuture.supplyAsync( + () -> { + try { + CompletableFuture> invocationFuture = + (CompletableFuture>) + invocation.callRealMethod(); + InputStream flakyInputStream = + new FlakyInputStream(invocationFuture.get(), counter, failure); + return new ResponseInputStream<>( + GetObjectResponse.builder().build(), flakyInputStream); + } catch (Throwable e) { + throw new RuntimeException(e); + } + })) + .when(flakyClient) + .getObject(any(GetObjectRequest.class), any(AsyncResponseTransformer.class)); + return flakyClient; + } + /** Wrapper for S3 client, used to mock the final class DefaultS3Client */ public static class S3ClientWrapper implements S3Client { @@ -184,6 +301,55 @@ public CreateBucketResponse createBucket(CreateBucketRequest createBucketRequest } } + /** Wrapper for S3 Async client, used to mock the final class DefaultS3AsyncClient */ + public static class S3AsyncClientWrapper implements S3AsyncClient { + + private final S3AsyncClient delegate; + + public S3AsyncClientWrapper(S3AsyncClient delegate) { + this.delegate = delegate; + } + + @Override + public String serviceName() { + return delegate.serviceName(); + } + + @Override + public void close() { + delegate.close(); + } + + @Override + public CompletableFuture getObject( + GetObjectRequest getObjectRequest, + AsyncResponseTransformer asyncResponseTransformer) { + return delegate.getObject(getObjectRequest, asyncResponseTransformer); + } + + @Override + public CompletableFuture headObject(HeadObjectRequest headObjectRequest) { + return delegate.headObject(headObjectRequest); + } + + @Override + public CompletableFuture putObject( + PutObjectRequest putObjectRequest, AsyncRequestBody requestBody) { + return delegate.putObject(putObjectRequest, requestBody); + } + + @Override + public CompletableFuture createBucket( + CreateBucketRequest createBucketRequest) { + return delegate.createBucket(createBucketRequest); + } + + @Override + public S3ServiceClientConfiguration serviceClientConfiguration() { + return delegate.serviceClientConfiguration(); + } + } + static class FlakyInputStream extends InputStream { private final ResponseInputStream delegate; private final AtomicInteger counter; diff --git a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestMetricsContext.java b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestMetricsContext.java new file mode 100644 index 000000000000..edb69b3dd04c --- /dev/null +++ b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestMetricsContext.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.aws.s3; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.iceberg.metrics.MetricsContext; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; + +/** + * Test implementation of MetricsContext that maintains isolated counters for each test instance. + * Used instead of Hadoop metrics to avoid shared state between test runs + */ +public class TestMetricsContext implements MetricsContext { + private final Map counters = Maps.newConcurrentMap(); + + @Override + public org.apache.iceberg.metrics.Counter counter(String name, Unit unit) { + return counters.computeIfAbsent(name, k -> new LocalCounter()); + } + + private static class LocalCounter implements org.apache.iceberg.metrics.Counter { + private final AtomicLong count = new AtomicLong(0); + + @Override + public void increment() { + increment(1L); + } + + @Override + public void increment(long amount) { + count.addAndGet(amount); + } + + @Override + public long value() { + return count.get(); + } + } +} diff --git a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestMinioUtil.java b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestMinioUtil.java index 9955aa7f8459..4f56455afcc1 100644 --- a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestMinioUtil.java +++ b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestMinioUtil.java @@ -22,12 +22,16 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.util.UUID; +import java.util.concurrent.CompletionException; import org.junit.jupiter.api.Test; import org.testcontainers.containers.MinIOContainer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; +import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.core.async.AsyncResponseTransformer; import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.core.sync.ResponseTransformer; +import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.CreateBucketRequest; import software.amazon.awssdk.services.s3.model.CreateBucketResponse; @@ -72,4 +76,41 @@ void validateS3ConditionalWrites() { String responseBody = getResponse.asUtf8String(); assertThat(responseBody).isEqualTo("test-payload-0"); } + + @Test + void validateS3ConditionalWritesUsingAsyncClient() { + S3AsyncClient s3AsyncClient = MinioUtil.createS3AsyncClient(MINIO); + + String bucket = "test-bucket-" + UUID.randomUUID(); + + CreateBucketResponse createBucketResponse = + s3AsyncClient.createBucket(CreateBucketRequest.builder().bucket(bucket).build()).join(); + assertThat(createBucketResponse.sdkHttpResponse().isSuccessful()).isTrue(); + + String key = "test-key-" + UUID.randomUUID().toString(); + for (int i = 0; i < 5; i++) { + String payload = "test-payload-" + i; + PutObjectRequest request = + PutObjectRequest.builder().bucket(bucket).key(key).ifNoneMatch("*").build(); + AsyncRequestBody body = AsyncRequestBody.fromString(payload); + if (i == 0) { + PutObjectResponse response = s3AsyncClient.putObject(request, body).join(); + assertThat(response.sdkHttpResponse().isSuccessful()).isTrue(); + } else { + assertThatThrownBy(() -> s3AsyncClient.putObject(request, body).join()) + .isInstanceOf(CompletionException.class) + .hasCauseInstanceOf(S3Exception.class) + .hasMessageContaining("Service: S3, Status Code: 412") + .hasMessageContaining("At least one of the pre-conditions you specified did not hold"); + } + } + + String responseBody = + s3AsyncClient + .getObject( + request -> request.bucket(bucket).key(key), AsyncResponseTransformer.toBytes()) + .join() + .asUtf8String(); + assertThat(responseBody).isEqualTo("test-payload-0"); + } } diff --git a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIO.java b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIO.java index ebee07e53e4c..1dda41156d54 100644 --- a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIO.java +++ b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIO.java @@ -18,6 +18,8 @@ */ package org.apache.iceberg.aws.s3; +import static org.apache.iceberg.aws.s3.S3TestUtil.mergeProperties; +import static org.apache.iceberg.aws.s3.S3TestUtil.skipIfAnalyticsAcceleratorEnabled; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.AdditionalAnswers.delegatesTo; @@ -119,7 +121,10 @@ public class TestS3FileIO { private final SerializableSupplier s3 = () -> MinioUtil.createS3Client(minio, legacyMd5PluginEnabled()); + private final SerializableSupplier s3Async = + () -> MinioUtil.createS3AsyncClient(minio); private final S3Client s3mock = mock(S3Client.class, delegatesTo(s3.get())); + private final S3AsyncClient s3Asyncmock = mock(S3AsyncClient.class, delegatesTo(s3Async.get())); private final Random random = new Random(1); private final int numBucketsForBatchDeletion = 3; private final String batchDeletionBucketPrefix = "batch-delete-"; @@ -148,13 +153,14 @@ protected boolean legacyMd5PluginEnabled() { @BeforeEach public void before() { - s3FileIO = new S3FileIO(() -> s3mock); + s3FileIO = new S3FileIO(() -> s3mock, () -> s3Asyncmock); s3FileIO.initialize(properties); createBucket(S3_GENERAL_PURPOSE_BUCKET); for (int i = 1; i <= numBucketsForBatchDeletion; i++) { createBucket(batchDeletionBucketPrefix + i); } StaticClientFactory.client = s3mock; + StaticClientFactory.asyncClient = s3Asyncmock; } @AfterEach @@ -164,16 +170,19 @@ public void after() { } } - @Test - public void testNewInputFile() throws IOException { + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testNewInputFile(Map aalProperties) throws IOException { + final S3FileIO testS3FileIO = new S3FileIO(() -> s3mock, () -> s3Asyncmock); + testS3FileIO.initialize(mergeProperties(aalProperties, properties)); String location = "s3://bucket/path/to/file.txt"; byte[] expected = new byte[1024 * 1024]; random.nextBytes(expected); - InputFile in = s3FileIO.newInputFile(location); + InputFile in = testS3FileIO.newInputFile(location); assertThat(in.exists()).isFalse(); - OutputFile out = s3FileIO.newOutputFile(location); + OutputFile out = testS3FileIO.newOutputFile(location); try (OutputStream os = out.createOrOverwrite()) { IOUtil.writeFully(os, ByteBuffer.wrap(expected)); } @@ -187,9 +196,10 @@ public void testNewInputFile() throws IOException { assertThat(actual).isEqualTo(expected); - s3FileIO.deleteFile(in); + testS3FileIO.deleteFile(in); - assertThat(s3FileIO.newInputFile(location).exists()).isFalse(); + assertThat(testS3FileIO.newInputFile(location).exists()).isFalse(); + testS3FileIO.close(); } @Test @@ -275,8 +285,11 @@ public void testSerializeClient() throws IOException, ClassNotFoundException { assertThat(post.get().serviceName()).isEqualTo("s3"); } - @Test - public void testPrefixList() { + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testPrefixList(Map aalProperties) { + final S3FileIO testS3FileIO = new S3FileIO(() -> s3mock, () -> s3Asyncmock); + testS3FileIO.initialize(mergeProperties(aalProperties, properties)); String prefix = "s3://bucket/path/to/list"; List scaleSizes = Lists.newArrayList(1, 1000, 2500); @@ -287,12 +300,14 @@ public void testPrefixList() { String scalePrefix = String.format("%s/%s/", prefix, scale); createRandomObjects(scalePrefix, scale); - assertThat(Streams.stream(s3FileIO.listPrefix(scalePrefix)).count()) + assertThat(Streams.stream(testS3FileIO.listPrefix(scalePrefix)).count()) .isEqualTo((long) scale); }); long totalFiles = scaleSizes.stream().mapToLong(Integer::longValue).sum(); - assertThat(Streams.stream(s3FileIO.listPrefix(prefix)).count()).isEqualTo(totalFiles); + assertThat(Streams.stream(testS3FileIO.listPrefix(prefix)).count()).isEqualTo(totalFiles); + testS3FileIO.deletePrefix(prefix); + testS3FileIO.close(); } /** @@ -396,18 +411,33 @@ public void testPrefixDelete() { }); } - @Test - public void testReadMissingLocation() { + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testReadMissingLocation(Map aalProperties) { + final Map testProperties = mergeProperties(aalProperties, this.properties); + skipIfAnalyticsAcceleratorEnabled( + new S3FileIOProperties(testProperties), + "Analytics Accelerator Library does not support custom Iceberg exception: NotFoundException"); + final S3FileIO testS3FileIO = new S3FileIO(() -> s3mock, () -> s3Asyncmock); + testS3FileIO.initialize(testProperties); String location = "s3://bucket/path/to/data.parquet"; - InputFile in = s3FileIO.newInputFile(location); + InputFile in = testS3FileIO.newInputFile(location); assertThatThrownBy(() -> in.newStream().read()) .isInstanceOf(NotFoundException.class) .hasMessage("Location does not exist: " + location); + testS3FileIO.close(); } - @Test - public void testMissingTableMetadata() { + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testMissingTableMetadata(Map aalProperties) { + final Map testProperties = mergeProperties(aalProperties, this.properties); + skipIfAnalyticsAcceleratorEnabled( + new S3FileIOProperties(testProperties), + "Analytics Accelerator Library does not support custom Iceberg exception: NotFoundException"); + final S3FileIO testS3FileIO = new S3FileIO(() -> s3mock, () -> s3Asyncmock); + testS3FileIO.initialize(testProperties); Map conf = Maps.newHashMap(); conf.put( CatalogProperties.URI, @@ -426,7 +456,7 @@ public void testMissingTableMetadata() { BaseTable table = (BaseTable) catalog.createTable(ident, schema); // delete the current metadata - s3FileIO.deleteFile(table.operations().current().metadataFileLocation()); + testS3FileIO.deleteFile(table.operations().current().metadataFileLocation()); long start = System.currentTimeMillis(); // to test NotFoundException, load the table again. refreshing the existing table doesn't @@ -438,6 +468,7 @@ public void testMissingTableMetadata() { long duration = System.currentTimeMillis() - start; assertThat(duration < 10_000).as("Should take less than 10 seconds").isTrue(); } + testS3FileIO.close(); } @Test @@ -572,8 +603,11 @@ public void testResolvingFileIOLoadWithoutConf() { assertThat(result).isInstanceOf(S3FileIO.class); } - @Test - public void testInputFileWithDataFile() throws IOException { + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testInputFileWithDataFile(Map aalProperties) throws IOException { + final S3FileIO testS3FileIO = new S3FileIO(() -> s3mock, () -> s3Asyncmock); + testS3FileIO.initialize(mergeProperties(aalProperties, properties)); String location = "s3://bucket/path/to/data-file.parquet"; DataFile dataFile = DataFiles.builder(PartitionSpec.unpartitioned()) @@ -582,22 +616,27 @@ public void testInputFileWithDataFile() throws IOException { .withFormat(FileFormat.PARQUET) .withRecordCount(123L) .build(); - OutputStream outputStream = s3FileIO.newOutputFile(location).create(); + OutputStream outputStream = testS3FileIO.newOutputFile(location).create(); byte[] data = "testing".getBytes(); outputStream.write(data); outputStream.close(); - InputFile inputFile = s3FileIO.newInputFile(dataFile); + InputFile inputFile = testS3FileIO.newInputFile(dataFile); reset(s3mock); assertThat(inputFile.getLength()) .as("Data file length should be determined from the file size stats") .isEqualTo(123L); verify(s3mock, never()).headObject(any(HeadObjectRequest.class)); + testS3FileIO.deleteFile(location); + testS3FileIO.close(); } - @Test - public void testInputFileWithManifest() throws IOException { + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testInputFileWithManifest(Map aalProperties) throws IOException { + final S3FileIO testS3FileIO = new S3FileIO(() -> s3mock, () -> s3Asyncmock); + testS3FileIO.initialize(mergeProperties(aalProperties, properties)); String dataFileLocation = "s3://bucket/path/to/data-file-2.parquet"; DataFile dataFile = DataFiles.builder(PartitionSpec.unpartitioned()) @@ -607,17 +646,19 @@ public void testInputFileWithManifest() throws IOException { .withRecordCount(123L) .build(); String manifestLocation = "s3://bucket/path/to/manifest.avro"; - OutputFile outputFile = s3FileIO.newOutputFile(manifestLocation); + OutputFile outputFile = testS3FileIO.newOutputFile(manifestLocation); ManifestWriter writer = ManifestFiles.write(PartitionSpec.unpartitioned(), outputFile); writer.add(dataFile); writer.close(); ManifestFile manifest = writer.toManifestFile(); - InputFile inputFile = s3FileIO.newInputFile(manifest); + InputFile inputFile = testS3FileIO.newInputFile(manifest); reset(s3mock); assertThat(inputFile.getLength()).isEqualTo(manifest.length()); verify(s3mock, never()).headObject(any(HeadObjectRequest.class)); + testS3FileIO.deleteFile(dataFileLocation); + testS3FileIO.close(); } @ParameterizedTest diff --git a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIOIntegration.java b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIOIntegration.java index 5fe410582ce5..9de86f7a84cc 100644 --- a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIOIntegration.java +++ b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIOIntegration.java @@ -18,6 +18,8 @@ */ package org.apache.iceberg.aws.s3; +import static org.apache.iceberg.aws.s3.S3TestUtil.mergeProperties; +import static org.apache.iceberg.aws.s3.S3TestUtil.skipIfAnalyticsAcceleratorEnabled; import static org.assertj.core.api.Assertions.assertThat; import java.io.ByteArrayInputStream; @@ -49,6 +51,8 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable; import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariables; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.regions.PartitionMetadata; import software.amazon.awssdk.regions.Region; @@ -159,18 +163,21 @@ public void beforeEach() { clientFactory.initialize(Maps.newHashMap()); } - @Test - public void testNewInputStream() throws Exception { + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testNewInputStream(Map aalProperties) throws Exception { s3.putObject( PutObjectRequest.builder().bucket(bucketName).key(objectKey).build(), RequestBody.fromBytes(contentBytes)); S3FileIO s3FileIO = new S3FileIO(clientFactory::s3); - s3FileIO.initialize(ImmutableMap.of()); + s3FileIO.initialize(aalProperties); validateRead(s3FileIO); } - @Test - public void testS3FileIOWithS3FileIOAwsClientFactoryImpl() throws Exception { + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testS3FileIOWithS3FileIOAwsClientFactoryImpl(Map aalProperties) + throws Exception { s3.putObject( PutObjectRequest.builder().bucket(bucketName).key(objectKey).build(), RequestBody.fromBytes(contentBytes)); @@ -179,38 +186,46 @@ public void testS3FileIOWithS3FileIOAwsClientFactoryImpl() throws Exception { properties.put( S3FileIOProperties.CLIENT_FACTORY, "org.apache.iceberg.aws.s3.DefaultS3FileIOAwsClientFactory"); - s3FileIO.initialize(properties); + s3FileIO.initialize(mergeProperties(aalProperties, properties)); validateRead(s3FileIO); } - @Test - public void testS3FileIOWithDefaultAwsClientFactoryImpl() throws Exception { + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testS3FileIOWithDefaultAwsClientFactoryImpl(Map aalProperties) + throws Exception { s3.putObject( PutObjectRequest.builder().bucket(bucketName).key(objectKey).build(), RequestBody.fromBytes(contentBytes)); S3FileIO s3FileIO = new S3FileIO(); - s3FileIO.initialize(Maps.newHashMap()); + s3FileIO.initialize(aalProperties); validateRead(s3FileIO); } - @Test - public void testNewInputStreamWithAccessPoint() throws Exception { + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testNewInputStreamWithAccessPoint(Map aalProperties) + throws Exception { requireAccessPointSupport(); s3.putObject( PutObjectRequest.builder().bucket(bucketName).key(objectKey).build(), RequestBody.fromBytes(contentBytes)); S3FileIO s3FileIO = new S3FileIO(clientFactory::s3); - s3FileIO.initialize( + Map testProperties = ImmutableMap.of( S3FileIOProperties.ACCESS_POINTS_PREFIX + bucketName, - testAccessPointARN(AwsIntegTestUtil.testRegion(), accessPointName))); + testAccessPointARN(AwsIntegTestUtil.testRegion(), accessPointName)); + s3FileIO.initialize(mergeProperties(aalProperties, testProperties)); validateRead(s3FileIO); } - @Test - public void testCrossRegionAccessEnabled() throws Exception { - clientFactory.initialize( - ImmutableMap.of(S3FileIOProperties.CROSS_REGION_ACCESS_ENABLED, "true")); + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testCrossRegionAccessEnabled(Map aalProperties) throws Exception { + Map testProperties = + ImmutableMap.of(S3FileIOProperties.CROSS_REGION_ACCESS_ENABLED, "true"); + Map properties = mergeProperties(aalProperties, testProperties); + clientFactory.initialize(properties); S3Client s3Client = clientFactory.s3(); String crossBucketObjectKey = String.format("%s/%s", prefix, UUID.randomUUID()); String crossBucketObjectUri = @@ -223,7 +238,7 @@ public void testCrossRegionAccessEnabled() throws Exception { .build(), RequestBody.fromBytes(contentBytes)); // make a copy in cross-region bucket - S3FileIO s3FileIO = new S3FileIO(clientFactory::s3); + S3FileIO s3FileIO = new S3FileIO(clientFactory::s3, clientFactory::s3Async); validateRead(s3FileIO, crossBucketObjectUri); } finally { AwsIntegTestUtil.cleanS3GeneralPurposeBucket( @@ -231,10 +246,15 @@ public void testCrossRegionAccessEnabled() throws Exception { } } - @Test - public void testNewInputStreamWithCrossRegionAccessPoint() throws Exception { + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testNewInputStreamWithCrossRegionAccessPoint(Map aalProperties) + throws Exception { requireAccessPointSupport(); - clientFactory.initialize(ImmutableMap.of(S3FileIOProperties.USE_ARN_REGION_ENABLED, "true")); + Map testProperties = + ImmutableMap.of(S3FileIOProperties.USE_ARN_REGION_ENABLED, "true"); + Map properties = mergeProperties(aalProperties, testProperties); + clientFactory.initialize(properties); S3Client s3Client = clientFactory.s3(); s3Client.putObject( PutObjectRequest.builder().bucket(bucketName).key(objectKey).build(), @@ -248,38 +268,33 @@ public void testNewInputStreamWithCrossRegionAccessPoint() throws Exception { .build(), RequestBody.fromBytes(contentBytes)); S3FileIO s3FileIO = new S3FileIO(clientFactory::s3); - s3FileIO.initialize( + Map accessPointProperties = ImmutableMap.of( S3FileIOProperties.ACCESS_POINTS_PREFIX + bucketName, - testAccessPointARN(AwsIntegTestUtil.testCrossRegion(), crossRegionAccessPointName))); + testAccessPointARN(AwsIntegTestUtil.testCrossRegion(), crossRegionAccessPointName)); + s3FileIO.initialize(mergeProperties(aalProperties, accessPointProperties)); validateRead(s3FileIO); } - @Test - public void testNewInputStreamWithMultiRegionAccessPoint() throws Exception { + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testNewInputStreamWithMultiRegionAccessPoint(Map aalProperties) + throws Exception { Assumptions.assumeThat(multiRegionAccessPointAlias).isNotEmpty(); - clientFactory.initialize(ImmutableMap.of(S3FileIOProperties.USE_ARN_REGION_ENABLED, "true")); + Map testProperties = + ImmutableMap.of(S3FileIOProperties.USE_ARN_REGION_ENABLED, "true"); + clientFactory.initialize(mergeProperties(aalProperties, testProperties)); S3Client s3Client = clientFactory.s3(); s3Client.putObject( PutObjectRequest.builder().bucket(bucketName).key(objectKey).build(), RequestBody.fromBytes(contentBytes)); S3FileIO s3FileIO = new S3FileIO(clientFactory::s3); - s3FileIO.initialize( + Map accessPointProperties = ImmutableMap.of( S3FileIOProperties.ACCESS_POINTS_PREFIX + bucketName, testMultiRegionAccessPointARN( - AwsIntegTestUtil.testRegion(), multiRegionAccessPointAlias))); - validateRead(s3FileIO); - } - - @Test - public void testNewInputStreamWithAnalyticsAccelerator() throws Exception { - s3.putObject( - PutObjectRequest.builder().bucket(bucketName).key(objectKey).build(), - RequestBody.fromBytes(contentBytes)); - S3FileIO s3FileIO = new S3FileIO(); - s3FileIO.initialize( - ImmutableMap.of(S3FileIOProperties.S3_ANALYTICS_ACCELERATOR_ENABLED, String.valueOf(true))); + AwsIntegTestUtil.testRegion(), multiRegionAccessPointAlias)); + s3FileIO.initialize(mergeProperties(aalProperties, accessPointProperties)); validateRead(s3FileIO); } @@ -397,11 +412,13 @@ public void testNewOutputStreamWithAnalyticsAccelerator() throws Exception { } } - @Test - public void testServerSideS3Encryption() throws Exception { + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testServerSideS3Encryption(Map aalProperties) throws Exception { S3FileIO s3FileIO = new S3FileIO(clientFactory::s3); - s3FileIO.initialize( - ImmutableMap.of(S3FileIOProperties.SSE_TYPE, S3FileIOProperties.SSE_TYPE_S3)); + Map testProperties = + ImmutableMap.of(S3FileIOProperties.SSE_TYPE, S3FileIOProperties.SSE_TYPE_S3); + s3FileIO.initialize(mergeProperties(aalProperties, testProperties)); write(s3FileIO); validateRead(s3FileIO); GetObjectResponse response = @@ -410,16 +427,18 @@ public void testServerSideS3Encryption() throws Exception { assertThat(response.serverSideEncryption()).isEqualTo(ServerSideEncryption.AES256); } - @Test - public void testServerSideKmsEncryption() throws Exception { + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testServerSideKmsEncryption(Map aalProperties) throws Exception { requireKMSEncryptionSupport(); S3FileIO s3FileIO = new S3FileIO(clientFactory::s3); - s3FileIO.initialize( + Map testProperties = ImmutableMap.of( S3FileIOProperties.SSE_TYPE, S3FileIOProperties.SSE_TYPE_KMS, S3FileIOProperties.SSE_KEY, - kmsKeyArn)); + kmsKeyArn); + s3FileIO.initialize(mergeProperties(aalProperties, testProperties)); write(s3FileIO); validateRead(s3FileIO); GetObjectResponse response = @@ -429,12 +448,15 @@ public void testServerSideKmsEncryption() throws Exception { assertThat(kmsKeyArn).isEqualTo(response.ssekmsKeyId()); } - @Test - public void testServerSideKmsEncryptionWithDefaultKey() throws Exception { + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testServerSideKmsEncryptionWithDefaultKey(Map aalProperties) + throws Exception { requireKMSEncryptionSupport(); S3FileIO s3FileIO = new S3FileIO(clientFactory::s3); - s3FileIO.initialize( - ImmutableMap.of(S3FileIOProperties.SSE_TYPE, S3FileIOProperties.SSE_TYPE_KMS)); + Map testProperties = + ImmutableMap.of(S3FileIOProperties.SSE_TYPE, S3FileIOProperties.SSE_TYPE_KMS); + s3FileIO.initialize(mergeProperties(aalProperties, testProperties)); write(s3FileIO); validateRead(s3FileIO); GetObjectResponse response = @@ -451,16 +473,19 @@ public void testServerSideKmsEncryptionWithDefaultKey() throws Exception { aliasListEntry -> assertThat(aliasListEntry.aliasName()).isEqualTo("alias/aws/s3")); } - @Test - public void testDualLayerServerSideKmsEncryption() throws Exception { + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testDualLayerServerSideKmsEncryption(Map aalProperties) + throws Exception { requireKMSEncryptionSupport(); S3FileIO s3FileIO = new S3FileIO(clientFactory::s3); - s3FileIO.initialize( + Map testProperties = ImmutableMap.of( S3FileIOProperties.SSE_TYPE, S3FileIOProperties.DSSE_TYPE_KMS, S3FileIOProperties.SSE_KEY, - kmsKeyArn)); + kmsKeyArn); + s3FileIO.initialize(mergeProperties(aalProperties, testProperties)); write(s3FileIO); validateRead(s3FileIO); GetObjectResponse response = @@ -470,8 +495,9 @@ public void testDualLayerServerSideKmsEncryption() throws Exception { assertThat(response.ssekmsKeyId()).isEqualTo(kmsKeyArn); } - @Test - public void testServerSideCustomEncryption() throws Exception { + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testServerSideCustomEncryption(Map aalProperties) throws Exception { requireKMSEncryptionSupport(); // generate key KeyGenerator keyGenerator = KeyGenerator.getInstance("AES"); @@ -485,14 +511,16 @@ public void testServerSideCustomEncryption() throws Exception { new String(encoder.encode(digest.digest(secretKey.getEncoded())), StandardCharsets.UTF_8); S3FileIO s3FileIO = new S3FileIO(clientFactory::s3); - s3FileIO.initialize( + Map testProperties = ImmutableMap.of( - S3FileIOProperties.SSE_TYPE, - S3FileIOProperties.SSE_TYPE_CUSTOM, - S3FileIOProperties.SSE_KEY, - encodedKey, - S3FileIOProperties.SSE_MD5, - md5)); + S3FileIOProperties.SSE_TYPE, S3FileIOProperties.SSE_TYPE_CUSTOM, + S3FileIOProperties.SSE_KEY, encodedKey, + S3FileIOProperties.SSE_MD5, md5); + Map properties = mergeProperties(aalProperties, testProperties); + skipIfAnalyticsAcceleratorEnabled( + new S3FileIOProperties(properties), + "Analytics Accelerator Library does not support Server Side Custom Encryption"); + s3FileIO.initialize(properties); write(s3FileIO); validateRead(s3FileIO); GetObjectResponse response = @@ -510,13 +538,15 @@ public void testServerSideCustomEncryption() throws Exception { assertThat(response.sseCustomerKeyMD5()).isEqualTo(md5); } - @Test - public void testACL() throws Exception { + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testACL(Map aalProperties) throws Exception { requireACLSupport(); S3FileIO s3FileIO = new S3FileIO(clientFactory::s3); - s3FileIO.initialize( + Map testProperties = ImmutableMap.of( - S3FileIOProperties.ACL, ObjectCannedACL.BUCKET_OWNER_FULL_CONTROL.toString())); + S3FileIOProperties.ACL, ObjectCannedACL.BUCKET_OWNER_FULL_CONTROL.toString()); + s3FileIO.initialize(mergeProperties(aalProperties, testProperties)); write(s3FileIO); validateRead(s3FileIO); @@ -529,65 +559,81 @@ public void testACL() throws Exception { .satisfies(grant -> assertThat(grant.permission()).isEqualTo(Permission.FULL_CONTROL)); } - @Test - public void testClientFactorySerialization() throws Exception { + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testClientFactorySerialization(Map aalProperties) throws Exception { S3FileIO fileIO = new S3FileIO(clientFactory::s3); - fileIO.initialize(ImmutableMap.of()); + fileIO.initialize(aalProperties); write(fileIO); byte[] data = TestHelpers.serialize(fileIO); S3FileIO fileIO2 = TestHelpers.deserialize(data); validateRead(fileIO2); } - @Test - public void testDeleteFilesMultipleBatches() throws Exception { + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testDeleteFilesMultipleBatches(Map aalProperties) throws Exception { S3FileIO s3FileIO = new S3FileIO(clientFactory::s3); - s3FileIO.initialize( - ImmutableMap.of(S3FileIOProperties.DELETE_BATCH_SIZE, Integer.toString(deletionBatchSize))); + Map testProperties = + ImmutableMap.of(S3FileIOProperties.DELETE_BATCH_SIZE, Integer.toString(deletionBatchSize)); + s3FileIO.initialize(mergeProperties(aalProperties, testProperties)); testDeleteFiles(deletionBatchSize * 2, s3FileIO); } - @Test - public void testDeleteFilesMultipleBatchesWithAccessPoints() throws Exception { + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testDeleteFilesMultipleBatchesWithAccessPoints(Map aalProperties) + throws Exception { requireAccessPointSupport(); S3FileIO s3FileIO = new S3FileIO(clientFactory::s3); - s3FileIO.initialize( + Map testProperties = ImmutableMap.of( S3FileIOProperties.DELETE_BATCH_SIZE, Integer.toString(deletionBatchSize), S3FileIOProperties.ACCESS_POINTS_PREFIX + bucketName, - testAccessPointARN(AwsIntegTestUtil.testRegion(), accessPointName))); + testAccessPointARN(AwsIntegTestUtil.testRegion(), accessPointName)); + s3FileIO.initialize(mergeProperties(aalProperties, testProperties)); testDeleteFiles(deletionBatchSize * 2, s3FileIO); } - @Test - public void testDeleteFilesMultipleBatchesWithCrossRegionAccessPoints() throws Exception { + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testDeleteFilesMultipleBatchesWithCrossRegionAccessPoints( + Map aalProperties) throws Exception { requireKMSEncryptionSupport(); - clientFactory.initialize(ImmutableMap.of(S3FileIOProperties.USE_ARN_REGION_ENABLED, "true")); + Map testProperties = + ImmutableMap.of(S3FileIOProperties.USE_ARN_REGION_ENABLED, "true"); + clientFactory.initialize(mergeProperties(aalProperties, testProperties)); S3FileIO s3FileIO = new S3FileIO(clientFactory::s3); - s3FileIO.initialize( + Map accessPointProperties = ImmutableMap.of( S3FileIOProperties.DELETE_BATCH_SIZE, Integer.toString(deletionBatchSize), S3FileIOProperties.ACCESS_POINTS_PREFIX + bucketName, - testAccessPointARN(AwsIntegTestUtil.testCrossRegion(), crossRegionAccessPointName))); + testAccessPointARN(AwsIntegTestUtil.testCrossRegion(), crossRegionAccessPointName)); + s3FileIO.initialize(mergeProperties(aalProperties, accessPointProperties)); testDeleteFiles(deletionBatchSize * 2, s3FileIO); } - @Test - public void testDeleteFilesLessThanBatchSize() throws Exception { + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testDeleteFilesLessThanBatchSize(Map aalProperties) throws Exception { S3FileIO s3FileIO = new S3FileIO(clientFactory::s3); - s3FileIO.initialize( - ImmutableMap.of(S3FileIOProperties.DELETE_BATCH_SIZE, Integer.toString(deletionBatchSize))); + Map testProperties = + ImmutableMap.of(S3FileIOProperties.DELETE_BATCH_SIZE, Integer.toString(deletionBatchSize)); + s3FileIO.initialize(mergeProperties(aalProperties, testProperties)); testDeleteFiles(deletionBatchSize - 1, s3FileIO); } - @Test - public void testDeleteFilesSingleBatchWithRemainder() throws Exception { + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testDeleteFilesSingleBatchWithRemainder(Map aalProperties) + throws Exception { S3FileIO s3FileIO = new S3FileIO(clientFactory::s3); - s3FileIO.initialize( - ImmutableMap.of(S3FileIOProperties.DELETE_BATCH_SIZE, Integer.toString(deletionBatchSize))); + Map testProperties = + ImmutableMap.of(S3FileIOProperties.DELETE_BATCH_SIZE, Integer.toString(deletionBatchSize)); + s3FileIO.initialize(mergeProperties(aalProperties, testProperties)); testDeleteFiles(5, s3FileIO); } @@ -629,11 +675,12 @@ public void testPrefixDelete() { }); } - @Test - public void testFileRecoveryHappyPath() throws Exception { + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testFileRecoveryHappyPath(Map aalProperties) throws Exception { requireVersioningSupport(); S3FileIO s3FileIO = new S3FileIO(clientFactory::s3); - s3FileIO.initialize(ImmutableMap.of()); + s3FileIO.initialize(aalProperties); String filePath = String.format("s3://%s/%s/%s", bucketName, prefix, "someFile.parquet"); write(s3FileIO, filePath); s3FileIO.deleteFile(filePath); @@ -641,13 +688,15 @@ public void testFileRecoveryHappyPath() throws Exception { assertThat(s3FileIO.recoverFile(filePath)).isTrue(); assertThat(s3FileIO.newInputFile(filePath).exists()).isTrue(); + s3FileIO.deleteFile(filePath); } - @Test - public void testFileRecoveryFailsToRecover() throws Exception { + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testFileRecoveryFailsToRecover(Map aalProperties) throws Exception { requireVersioningSupport(); S3FileIO s3FileIO = new S3FileIO(clientFactory::s3); - s3FileIO.initialize(ImmutableMap.of()); + s3FileIO.initialize(aalProperties); s3.putBucketVersioning( PutBucketVersioningRequest.builder() .bucket(bucketName) diff --git a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3InputStream.java b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3InputStream.java index f8903842df37..69a08f528e66 100644 --- a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3InputStream.java +++ b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3InputStream.java @@ -18,21 +18,28 @@ */ package org.apache.iceberg.aws.s3; +import static org.apache.iceberg.aws.s3.S3TestUtil.skipIfAnalyticsAcceleratorEnabled; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.io.IOException; import java.util.Arrays; +import java.util.Map; import java.util.Random; +import org.apache.iceberg.io.FileIOMetricsContext; import org.apache.iceberg.io.IOUtil; import org.apache.iceberg.io.RangeReadable; import org.apache.iceberg.io.SeekableInputStream; +import org.apache.iceberg.metrics.Counter; +import org.apache.iceberg.metrics.MetricsContext; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; import org.testcontainers.containers.MinIOContainer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.BucketAlreadyExistsException; import software.amazon.awssdk.services.s3.model.BucketAlreadyOwnedByYouException; @@ -44,6 +51,7 @@ public class TestS3InputStream { @Container private static final MinIOContainer MINIO = MinioUtil.createContainer(); private final S3Client s3 = MinioUtil.createS3Client(MINIO); + private final S3AsyncClient s3Async = MinioUtil.createS3AsyncClient(MINIO); private final Random random = new Random(1); @BeforeEach @@ -51,23 +59,39 @@ public void before() { createBucket("bucket"); } - @Test - public void testRead() throws Exception { - testRead(s3); + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testRead(Map aalProperties) throws Exception { + testRead(s3, s3Async, aalProperties); } - S3InputStream newInputStream(S3Client s3Client, S3URI uri) { - return new S3InputStream(s3Client, uri); + SeekableInputStream newInputStream( + S3Client s3Client, + S3AsyncClient s3AsyncClient, + S3URI uri, + Map aalProperties, + MetricsContext metricsContext) { + final S3FileIOProperties s3FileIOProperties = new S3FileIOProperties(aalProperties); + if (s3FileIOProperties.isS3AnalyticsAcceleratorEnabled()) { + PrefixedS3Client client = + new PrefixedS3Client("s3", aalProperties, () -> s3Client, () -> s3AsyncClient); + return AnalyticsAcceleratorUtil.newStream( + S3InputFile.fromLocation(uri.location(), client, metricsContext)); + } + return new S3InputStream(s3Client, uri, s3FileIOProperties, metricsContext); } - protected void testRead(S3Client s3Client) throws Exception { + protected void testRead( + S3Client s3Client, S3AsyncClient s3AsyncClient, Map aalProperties) + throws Exception { S3URI uri = new S3URI("s3://bucket/path/to/read.dat"); int dataSize = 1024 * 1024 * 10; byte[] data = randomData(dataSize); writeS3Data(uri, data); - try (SeekableInputStream in = newInputStream(s3Client, uri)) { + try (SeekableInputStream in = + newInputStream(s3Client, s3AsyncClient, uri, aalProperties, MetricsContext.nullMetrics())) { int readSize = 1024; readAndCheck(in, in.getPos(), readSize, data, false); readAndCheck(in, in.getPos(), readSize, data, true); @@ -114,12 +138,15 @@ private void readAndCheck( assertThat(actual).isEqualTo(Arrays.copyOfRange(original, (int) rangeStart, (int) rangeEnd)); } - @Test - public void testRangeRead() throws Exception { - testRangeRead(s3); + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testRangeRead(Map aalProperties) throws Exception { + testRangeRead(s3, s3Async, aalProperties); } - protected void testRangeRead(S3Client s3Client) throws Exception { + protected void testRangeRead( + S3Client s3Client, S3AsyncClient s3AsyncClient, Map aalProperties) + throws Exception { S3URI uri = new S3URI("s3://bucket/path/to/range-read.dat"); int dataSize = 1024 * 1024 * 10; byte[] expected = randomData(dataSize); @@ -131,7 +158,10 @@ protected void testRangeRead(S3Client s3Client) throws Exception { writeS3Data(uri, expected); - try (RangeReadable in = newInputStream(s3Client, uri)) { + try (RangeReadable in = + (RangeReadable) + newInputStream( + s3Client, s3AsyncClient, uri, aalProperties, MetricsContext.nullMetrics())) { // first 1k position = 0; offset = 0; @@ -160,28 +190,38 @@ private void readAndCheckRanges( .isEqualTo(Arrays.copyOfRange(original, offset, offset + length)); } - @Test - public void testClose() throws Exception { + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testClose(Map aalProperties) throws Exception { + final S3FileIOProperties s3FileIOProperties = new S3FileIOProperties(aalProperties); + skipIfAnalyticsAcceleratorEnabled( + s3FileIOProperties, + "Analytics Accelerator Library has different exception handling when closed"); S3URI uri = new S3URI("s3://bucket/path/to/closed.dat"); - SeekableInputStream closed = newInputStream(s3, uri); + SeekableInputStream closed = + newInputStream(s3, s3Async, uri, aalProperties, MetricsContext.nullMetrics()); closed.close(); assertThatThrownBy(() -> closed.seek(0)) .isInstanceOf(IllegalStateException.class) .hasMessage("already closed"); } - @Test - public void testSeek() throws Exception { - testSeek(s3); + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testSeek(Map aalProperties) throws Exception { + testSeek(s3, s3Async, aalProperties); } - protected void testSeek(S3Client s3Client) throws Exception { + protected void testSeek( + S3Client s3Client, S3AsyncClient s3AsyncClient, Map aalProperties) + throws Exception { S3URI uri = new S3URI("s3://bucket/path/to/seek.dat"); byte[] expected = randomData(1024 * 1024); writeS3Data(uri, expected); - try (SeekableInputStream in = newInputStream(s3Client, uri)) { + try (SeekableInputStream in = + newInputStream(s3Client, s3AsyncClient, uri, aalProperties, MetricsContext.nullMetrics())) { in.seek(expected.length / 2); byte[] actual = new byte[expected.length / 2]; IOUtil.readFully(in, actual, 0, expected.length / 2); @@ -190,6 +230,39 @@ protected void testSeek(S3Client s3Client) throws Exception { } } + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testMetrics(Map aalProperties) throws Exception { + testMetrics(s3, s3Async, aalProperties); + } + + protected void testMetrics( + S3Client s3Client, S3AsyncClient s3AsyncClient, Map aalProperties) + throws Exception { + MetricsContext metricsContext = new TestMetricsContext(); + Counter readBytes = metricsContext.counter(FileIOMetricsContext.READ_BYTES); + Counter readOperations = metricsContext.counter(FileIOMetricsContext.READ_OPERATIONS); + + S3URI uri = new S3URI("s3://bucket/path/to/metrics.dat"); + int dataSize = 1024 * 1024 * 10; + byte[] data = randomData(dataSize); + + writeS3Data(uri, data); + + try (SeekableInputStream in = + newInputStream(s3Client, s3AsyncClient, uri, aalProperties, metricsContext)) { + int readSize = 1024; + + readAndCheck(in, in.getPos(), readSize, data, false); + assertThat(readBytes.value()).isEqualTo(readSize); + assertThat(readOperations.value()).isEqualTo(readSize); + + readAndCheck(in, in.getPos(), readSize, data, true); + assertThat(readBytes.value()).isEqualTo(readSize * 2L); + assertThat(readOperations.value()).isEqualTo(readSize + 1L); + } + } + private byte[] randomData(int size) { byte[] data = new byte[size]; random.nextBytes(data); @@ -217,4 +290,8 @@ private void createBucket(String bucketName) { protected S3Client s3Client() { return s3; } + + protected S3AsyncClient s3AsyncClient() { + return s3Async; + } } diff --git a/aws/src/main/java/org/apache/iceberg/aws/AssumeRoleAwsClientFactory.java b/aws/src/main/java/org/apache/iceberg/aws/AssumeRoleAwsClientFactory.java index 59a4d8d3ac38..8650c387bac0 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/AssumeRoleAwsClientFactory.java +++ b/aws/src/main/java/org/apache/iceberg/aws/AssumeRoleAwsClientFactory.java @@ -39,6 +39,7 @@ public class AssumeRoleAwsClientFactory implements AwsClientFactory { private AwsProperties awsProperties; private HttpClientProperties httpClientProperties; + private HttpAsyncClientProperties httpAsyncClientProperties; private S3FileIOProperties s3FileIOProperties; private String roleSessionName; private AwsClientProperties awsClientProperties; @@ -61,18 +62,19 @@ public S3AsyncClient s3Async() { if (s3FileIOProperties.isS3CRTEnabled()) { return S3AsyncClient.crtBuilder() .applyMutation(this::applyAssumeRoleConfigurations) - .applyMutation(awsClientProperties::applyClientRegionConfiguration) - .applyMutation(awsClientProperties::applyClientCredentialConfigurations) + .applyMutation(httpAsyncClientProperties::applyHttpAsyncClientConfigurations) .applyMutation(s3FileIOProperties::applyEndpointConfigurations) - .applyMutation(s3FileIOProperties::applyS3CrtConfigurations) + .applyMutation(s3FileIOProperties::applyServiceConfigurations) .build(); } return S3AsyncClient.builder() .applyMutation(this::applyAssumeRoleConfigurations) - .applyMutation(awsClientProperties::applyClientRegionConfiguration) - .applyMutation(awsClientProperties::applyClientCredentialConfigurations) .applyMutation(awsClientProperties::applyLegacyMd5Plugin) + .applyMutation(httpAsyncClientProperties::applyHttpAsyncClientConfigurations) .applyMutation(s3FileIOProperties::applyEndpointConfigurations) + .applyMutation(s3FileIOProperties::applyServiceConfigurations) + .applyMutation(s3FileIOProperties::applySignerConfiguration) + .applyMutation(s3FileIOProperties::applyRetryConfigurations) .build(); } @@ -107,6 +109,7 @@ public void initialize(Map properties) { this.awsProperties = new AwsProperties(properties); this.s3FileIOProperties = new S3FileIOProperties(properties); this.httpClientProperties = new HttpClientProperties(properties); + this.httpAsyncClientProperties = new HttpAsyncClientProperties(properties); this.awsClientProperties = new AwsClientProperties(properties); this.roleSessionName = genSessionName(); Preconditions.checkNotNull( @@ -150,6 +153,10 @@ protected HttpClientProperties httpClientProperties() { return httpClientProperties; } + protected HttpAsyncClientProperties httpAsyncClientProperties() { + return httpAsyncClientProperties; + } + protected S3FileIOProperties s3FileIOProperties() { return s3FileIOProperties; } diff --git a/aws/src/main/java/org/apache/iceberg/aws/AwsClientFactories.java b/aws/src/main/java/org/apache/iceberg/aws/AwsClientFactories.java index cd5715b93b63..324243896216 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/AwsClientFactories.java +++ b/aws/src/main/java/org/apache/iceberg/aws/AwsClientFactories.java @@ -95,12 +95,14 @@ static class DefaultAwsClientFactory implements AwsClientFactory { private AwsClientProperties awsClientProperties; private S3FileIOProperties s3FileIOProperties; private HttpClientProperties httpClientProperties; + private HttpAsyncClientProperties httpAsyncClientProperties; DefaultAwsClientFactory() { awsProperties = new AwsProperties(); awsClientProperties = new AwsClientProperties(); s3FileIOProperties = new S3FileIOProperties(); httpClientProperties = new HttpClientProperties(); + httpAsyncClientProperties = new HttpAsyncClientProperties(); } @Override @@ -125,18 +127,25 @@ public S3AsyncClient s3Async() { if (s3FileIOProperties.isS3CRTEnabled()) { return S3AsyncClient.crtBuilder() .applyMutation(awsClientProperties::applyClientRegionConfiguration) + .applyMutation(httpAsyncClientProperties::applyHttpAsyncClientConfigurations) + .applyMutation(s3FileIOProperties::applyEndpointConfigurations) + .applyMutation(s3FileIOProperties::applyServiceConfigurations) .applyMutation( b -> s3FileIOProperties.applyCredentialConfigurations(awsClientProperties, b)) - .applyMutation(s3FileIOProperties::applyEndpointConfigurations) - .applyMutation(s3FileIOProperties::applyS3CrtConfigurations) .build(); } return S3AsyncClient.builder() .applyMutation(awsClientProperties::applyClientRegionConfiguration) .applyMutation(awsClientProperties::applyLegacyMd5Plugin) + .applyMutation(httpAsyncClientProperties::applyHttpAsyncClientConfigurations) + .applyMutation(s3FileIOProperties::applyEndpointConfigurations) + .applyMutation(s3FileIOProperties::applyServiceConfigurations) .applyMutation( b -> s3FileIOProperties.applyCredentialConfigurations(awsClientProperties, b)) - .applyMutation(s3FileIOProperties::applyEndpointConfigurations) + .applyMutation(s3FileIOProperties::applySignerConfiguration) + .applyMutation(s3FileIOProperties::applyS3AccessGrantsConfigurations) + .applyMutation(s3FileIOProperties::applyUserAgentConfigurations) + .applyMutation(s3FileIOProperties::applyRetryConfigurations) .build(); } @@ -176,6 +185,7 @@ public void initialize(Map properties) { this.awsClientProperties = new AwsClientProperties(properties); this.s3FileIOProperties = new S3FileIOProperties(properties); this.httpClientProperties = new HttpClientProperties(properties); + this.httpAsyncClientProperties = new HttpAsyncClientProperties(properties); } } diff --git a/aws/src/main/java/org/apache/iceberg/aws/AwsClientProperties.java b/aws/src/main/java/org/apache/iceberg/aws/AwsClientProperties.java index cf73e80f44c1..b139aeb0a94c 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/AwsClientProperties.java +++ b/aws/src/main/java/org/apache/iceberg/aws/AwsClientProperties.java @@ -179,21 +179,6 @@ void applyClientCredentialConfigurations(BuilderT builder) { } } - /** - * Configure the credential provider for S3 CRT clients. - * - *

Sample usage: - * - *

-   *     S3AsyncClient.crtBuilder().applyMutation(awsClientProperties::applyClientCredentialConfigurations)
-   * 
- */ - public void applyClientCredentialConfigurations(T builder) { - if (!Strings.isNullOrEmpty(this.clientCredentialsProvider)) { - builder.credentialsProvider(credentialsProvider(this.clientCredentialsProvider)); - } - } - /** * Returns a credentials provider instance. If {@link #refreshCredentialsEndpoint} is set, an * instance of {@link VendedCredentialsProvider} is returned. If params were set, we return a new diff --git a/aws/src/main/java/org/apache/iceberg/aws/CrtHttpAsyncClientConfigurations.java b/aws/src/main/java/org/apache/iceberg/aws/CrtHttpAsyncClientConfigurations.java new file mode 100644 index 000000000000..81b7ffce6b69 --- /dev/null +++ b/aws/src/main/java/org/apache/iceberg/aws/CrtHttpAsyncClientConfigurations.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.aws; + +import java.time.Duration; +import java.util.Map; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.util.PropertyUtil; +import software.amazon.awssdk.services.s3.S3CrtAsyncClientBuilder; +import software.amazon.awssdk.services.s3.crt.S3CrtHttpConfiguration; + +class CrtHttpAsyncClientConfigurations { + private Integer maxConcurrency; + private Long connectionTimeoutMs; + + private CrtHttpAsyncClientConfigurations() {} + + public void configureHttpAsyncClientBuilder( + T s3CrtAsyncClientBuilder) { + configureCrtHttpAsyncClientBuilder(s3CrtAsyncClientBuilder); + } + + private void initialize(Map httpAsyncClientProperties) { + this.maxConcurrency = + PropertyUtil.propertyAsInt( + httpAsyncClientProperties, + HttpAsyncClientProperties.CRT_MAX_CONCURRENCY, + HttpAsyncClientProperties.CRT_MAX_CONCURRENCY_DEFAULT); + this.connectionTimeoutMs = + PropertyUtil.propertyAsNullableLong( + httpAsyncClientProperties, HttpAsyncClientProperties.CRT_CONNECTION_TIMEOUT_MS); + } + + @VisibleForTesting + void configureCrtHttpAsyncClientBuilder(S3CrtAsyncClientBuilder crtBuilder) { + if (connectionTimeoutMs != null) { + crtBuilder.httpConfiguration( + S3CrtHttpConfiguration.builder() + .connectionTimeout(Duration.ofMillis(connectionTimeoutMs)) + .build()); + } + if (maxConcurrency != null) { + crtBuilder.maxConcurrency(maxConcurrency); + } + } + + public static CrtHttpAsyncClientConfigurations create(Map properties) { + CrtHttpAsyncClientConfigurations configurations = new CrtHttpAsyncClientConfigurations(); + configurations.initialize(properties); + return configurations; + } +} diff --git a/aws/src/main/java/org/apache/iceberg/aws/HttpAsyncClientProperties.java b/aws/src/main/java/org/apache/iceberg/aws/HttpAsyncClientProperties.java new file mode 100644 index 000000000000..9d0c841c3fa7 --- /dev/null +++ b/aws/src/main/java/org/apache/iceberg/aws/HttpAsyncClientProperties.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.aws; + +import java.io.Serializable; +import java.util.Collections; +import java.util.Map; +import org.apache.iceberg.common.DynMethods; +import org.apache.iceberg.util.PropertyUtil; +import software.amazon.awssdk.services.s3.S3AsyncClientBuilder; +import software.amazon.awssdk.services.s3.S3CrtAsyncClientBuilder; + +/** + * Configuration properties for HTTP async clients used by AWS services. + * + *

The client type (Netty vs CRT) is determined by the {@code s3.crt-enabled} property in + * S3FileIOProperties. When CRT is enabled, CRT-specific properties are used; when disabled, + * Netty-specific properties are used. + */ +public class HttpAsyncClientProperties implements Serializable { + + private static final String CLIENT_PREFIX = "http-async-client."; + + /** + * Used to configure the connection timeout in milliseconds for {@link + * software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient.Builder}. This flag only works + * when {@code s3.crt-enabled} is {@code false} + * + *

For more details, see + * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClient.Builder.html + */ + public static final String NETTY_CONNECTION_TIMEOUT_MS = + "http-async-client.netty.connection-timeout-ms"; + + /** + * Used to configure the socket timeout in milliseconds for {@link + * software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient.Builder}. This flag only works + * when {@code s3.crt-enabled} is {@code false} + * + *

For more details, see + * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClient.Builder.html + */ + public static final String NETTY_SOCKET_TIMEOUT_MS = "http-async-client.netty.socket-timeout-ms"; + + /** + * Used to configure the connection acquisition timeout in milliseconds for {@link + * software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient.Builder}. This flag only works + * when {@code s3.crt-enabled} is {@code false} + * + *

For more details, see + * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClient.Builder.html + */ + public static final String NETTY_CONNECTION_ACQUISITION_TIMEOUT_MS = + "http-async-client.netty.connection-acquisition-timeout-ms"; + + /** + * Used to configure the connection max idle time in milliseconds for {@link + * software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient.Builder}. This flag only works + * when {@code s3.crt-enabled} is {@code false} + * + *

For more details, see + * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClient.Builder.html + */ + public static final String NETTY_CONNECTION_MAX_IDLE_TIME_MS = + "http-async-client.netty.connection-max-idle-time-ms"; + + /** + * Used to configure the connection time to live in milliseconds for {@link + * software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient.Builder}. This flag only works + * when {@code s3.crt-enabled} is {@code false} + * + *

For more details, see + * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClient.Builder.html + */ + public static final String NETTY_CONNECTION_TIME_TO_LIVE_MS = + "http-async-client.netty.connection-time-to-live-ms"; + + /** + * Used to configure the max connections number for {@link + * software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient.Builder}. This flag only works + * when {@code s3.crt-enabled} is {@code false} + * + *

For more details, see + * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClient.Builder.html + */ + public static final String NETTY_MAX_CONNECTIONS = "http-async-client.netty.max-connections"; + + /** + * Used to configure whether to enable the tcp keep alive setting for {@link + * software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient.Builder}. This flag only works + * when {@code s3.crt-enabled} is {@code false}. + * + *

In default, this is disabled. + * + *

For more details, see + * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClient.Builder.html + */ + public static final String NETTY_TCP_KEEP_ALIVE_ENABLED = + "http-async-client.netty.tcp-keep-alive-enabled"; + + /** + * Used to configure whether to use idle connection reaper for {@link + * software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient.Builder}. This flag only works + * when {@code s3.crt-enabled} is {@code false}. + * + *

In default, this is enabled. + * + *

For more details, see + * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClient.Builder.html + */ + public static final String NETTY_USE_IDLE_CONNECTION_REAPER_ENABLED = + "http-async-client.netty.use-idle-connection-reaper-enabled"; + + /** + * Used to configure the proxy endpoint for {@link + * software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient.Builder}. This flag only works + * when {@code s3.crt-enabled} is {@code false} + * + *

For more details, see + * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClient.Builder.html + */ + public static final String NETTY_PROXY_ENDPOINT = "http-async-client.netty.proxy-endpoint"; + + /** + * Used to configure the connection timeout in milliseconds for {@link + * software.amazon.awssdk.services.s3.internal.crt.S3CrtAsyncHttpClient.Builder}. This flag only + * works when {@code s3.crt-enabled} is {@code true} + * + *

For more details, see + * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/s3/internal/crt/S3CrtAsyncHttpClient.Builder.html + */ + public static final String CRT_CONNECTION_TIMEOUT_MS = + "http-async-client.crt.connection-timeout-ms"; + + /** + * Used to configure the max concurrency for {@link + * software.amazon.awssdk.services.s3.internal.crt.S3CrtAsyncHttpClient.Builder}. This flag only + * works when {@code s3.crt-enabled} is {@code true} + * + *

For more details, see + * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/s3/internal/crt/S3CrtAsyncHttpClient.Builder.html + */ + public static final String CRT_MAX_CONCURRENCY = "http-async-client.crt.max-concurrency"; + + /** + * To fully benefit from the analytics-accelerator-s3 library where this S3 CRT client is used, it + * is recommended to initialize with higher concurrency. + * + *

For more details, see: https://github.com/awslabs/analytics-accelerator-s3 + */ + public static final int CRT_MAX_CONCURRENCY_DEFAULT = 500; + + private final Map httpAsyncClientProperties; + + public HttpAsyncClientProperties() { + this.httpAsyncClientProperties = Collections.emptyMap(); + } + + public HttpAsyncClientProperties(Map properties) { + this.httpAsyncClientProperties = + PropertyUtil.filterProperties(properties, key -> key.startsWith(CLIENT_PREFIX)); + } + + public void applyHttpAsyncClientConfigurations(T builder) { + NettyHttpAsyncClientConfigurations nettyConfigurations = + loadHttpAsyncClientConfigurations(NettyHttpAsyncClientConfigurations.class.getName()); + nettyConfigurations.configureHttpAsyncClientBuilder(builder); + } + + public void applyHttpAsyncClientConfigurations(T builder) { + CrtHttpAsyncClientConfigurations crtConfigurations = + loadHttpAsyncClientConfigurations(CrtHttpAsyncClientConfigurations.class.getName()); + crtConfigurations.configureHttpAsyncClientBuilder(builder); + } + + private T loadHttpAsyncClientConfigurations(String impl) { + try { + Object httpAsyncClientConfigurations = + DynMethods.builder("create") + .hiddenImpl(impl, Map.class) + .buildStaticChecked() + .invoke(httpAsyncClientProperties); + return (T) httpAsyncClientConfigurations; + } catch (NoSuchMethodException e) { + throw new IllegalArgumentException( + String.format("Cannot create %s to generate and configure the http client builder", impl), + e); + } + } +} diff --git a/aws/src/main/java/org/apache/iceberg/aws/NettyHttpAsyncClientConfigurations.java b/aws/src/main/java/org/apache/iceberg/aws/NettyHttpAsyncClientConfigurations.java new file mode 100644 index 000000000000..8a986e342887 --- /dev/null +++ b/aws/src/main/java/org/apache/iceberg/aws/NettyHttpAsyncClientConfigurations.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.aws; + +import java.net.URI; +import java.time.Duration; +import java.util.Map; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.util.PropertyUtil; +import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; +import software.amazon.awssdk.http.nio.netty.ProxyConfiguration; +import software.amazon.awssdk.services.s3.S3AsyncClientBuilder; + +class NettyHttpAsyncClientConfigurations { + private Long connectionTimeoutMs; + private Long socketTimeoutMs; + private Long acquisitionTimeoutMs; + private Long connectionMaxIdleTimeMs; + private Long connectionTimeToLiveMs; + private Integer maxConnections; + private Boolean tcpKeepAliveEnabled; + private Boolean useIdleConnectionReaperEnabled; + private String proxyEndpoint; + + private NettyHttpAsyncClientConfigurations() {} + + public void configureHttpAsyncClientBuilder( + T s3AsyncClientBuilder) { + NettyNioAsyncHttpClient.Builder nettyBuilder = NettyNioAsyncHttpClient.builder(); + configureNettyHttpAsyncClientBuilder(nettyBuilder); + s3AsyncClientBuilder.httpClientBuilder(nettyBuilder); + } + + private void initialize(Map httpAsyncClientProperties) { + this.connectionTimeoutMs = + PropertyUtil.propertyAsNullableLong( + httpAsyncClientProperties, HttpAsyncClientProperties.NETTY_CONNECTION_TIMEOUT_MS); + this.socketTimeoutMs = + PropertyUtil.propertyAsNullableLong( + httpAsyncClientProperties, HttpAsyncClientProperties.NETTY_SOCKET_TIMEOUT_MS); + this.acquisitionTimeoutMs = + PropertyUtil.propertyAsNullableLong( + httpAsyncClientProperties, + HttpAsyncClientProperties.NETTY_CONNECTION_ACQUISITION_TIMEOUT_MS); + this.connectionMaxIdleTimeMs = + PropertyUtil.propertyAsNullableLong( + httpAsyncClientProperties, HttpAsyncClientProperties.NETTY_CONNECTION_MAX_IDLE_TIME_MS); + this.connectionTimeToLiveMs = + PropertyUtil.propertyAsNullableLong( + httpAsyncClientProperties, HttpAsyncClientProperties.NETTY_CONNECTION_TIME_TO_LIVE_MS); + this.maxConnections = + PropertyUtil.propertyAsNullableInt( + httpAsyncClientProperties, HttpAsyncClientProperties.NETTY_MAX_CONNECTIONS); + this.tcpKeepAliveEnabled = + PropertyUtil.propertyAsNullableBoolean( + httpAsyncClientProperties, HttpAsyncClientProperties.NETTY_TCP_KEEP_ALIVE_ENABLED); + this.useIdleConnectionReaperEnabled = + PropertyUtil.propertyAsNullableBoolean( + httpAsyncClientProperties, + HttpAsyncClientProperties.NETTY_USE_IDLE_CONNECTION_REAPER_ENABLED); + this.proxyEndpoint = + PropertyUtil.propertyAsString( + httpAsyncClientProperties, HttpAsyncClientProperties.NETTY_PROXY_ENDPOINT, null); + } + + @VisibleForTesting + void configureNettyHttpAsyncClientBuilder(NettyNioAsyncHttpClient.Builder nettyBuilder) { + if (connectionTimeoutMs != null) { + nettyBuilder.connectionTimeout(Duration.ofMillis(connectionTimeoutMs)); + } + if (socketTimeoutMs != null) { + nettyBuilder.readTimeout(Duration.ofMillis(socketTimeoutMs)); + nettyBuilder.writeTimeout(Duration.ofMillis(socketTimeoutMs)); + } + if (acquisitionTimeoutMs != null) { + nettyBuilder.connectionAcquisitionTimeout(Duration.ofMillis(acquisitionTimeoutMs)); + } + if (connectionMaxIdleTimeMs != null) { + nettyBuilder.connectionMaxIdleTime(Duration.ofMillis(connectionMaxIdleTimeMs)); + } + if (connectionTimeToLiveMs != null) { + nettyBuilder.connectionTimeToLive(Duration.ofMillis(connectionTimeToLiveMs)); + } + if (maxConnections != null) { + nettyBuilder.maxConcurrency(maxConnections); + } + if (tcpKeepAliveEnabled != null) { + nettyBuilder.tcpKeepAlive(tcpKeepAliveEnabled); + } + if (useIdleConnectionReaperEnabled != null) { + nettyBuilder.useIdleConnectionReaper(useIdleConnectionReaperEnabled); + } + if (proxyEndpoint != null) { + nettyBuilder.proxyConfiguration( + ProxyConfiguration.builder() + .host(URI.create(proxyEndpoint).getHost()) + .port(URI.create(proxyEndpoint).getPort()) + .build()); + } + } + + public static NettyHttpAsyncClientConfigurations create(Map properties) { + NettyHttpAsyncClientConfigurations configurations = new NettyHttpAsyncClientConfigurations(); + configurations.initialize(properties); + return configurations; + } +} diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/AnalyticsAcceleratorInputStreamWrapper.java b/aws/src/main/java/org/apache/iceberg/aws/s3/AnalyticsAcceleratorInputStreamWrapper.java index 197f79905f0f..973bbeac8415 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/AnalyticsAcceleratorInputStreamWrapper.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/AnalyticsAcceleratorInputStreamWrapper.java @@ -19,31 +19,58 @@ package org.apache.iceberg.aws.s3; import java.io.IOException; +import org.apache.iceberg.io.FileIOMetricsContext; +import org.apache.iceberg.io.RangeReadable; import org.apache.iceberg.io.SeekableInputStream; +import org.apache.iceberg.metrics.Counter; +import org.apache.iceberg.metrics.MetricsContext; import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream; /** A wrapper to convert {@link S3SeekableInputStream} to Iceberg {@link SeekableInputStream} */ -class AnalyticsAcceleratorInputStreamWrapper extends SeekableInputStream { +class AnalyticsAcceleratorInputStreamWrapper extends SeekableInputStream implements RangeReadable { private final S3SeekableInputStream delegate; + private final Counter readBytes; + private final Counter readOperations; AnalyticsAcceleratorInputStreamWrapper(S3SeekableInputStream stream) { + this(stream, MetricsContext.nullMetrics()); + } + + AnalyticsAcceleratorInputStreamWrapper(S3SeekableInputStream stream, MetricsContext metrics) { this.delegate = stream; + this.readBytes = metrics.counter(FileIOMetricsContext.READ_BYTES, MetricsContext.Unit.BYTES); + this.readOperations = metrics.counter(FileIOMetricsContext.READ_OPERATIONS); } @Override public int read() throws IOException { - return this.delegate.read(); + int nextByteValue = this.delegate.read(); + if (nextByteValue != -1) { + readBytes.increment(); + } + readOperations.increment(); + return nextByteValue; } @Override - public int read(byte[] b) throws IOException { - return this.delegate.read(b, 0, b.length); + public int read(byte[] b, int off, int len) throws IOException { + int bytesRead = this.delegate.read(b, off, len); + if (bytesRead > 0) { + readBytes.increment(bytesRead); + } + readOperations.increment(); + return bytesRead; } @Override - public int read(byte[] b, int off, int len) throws IOException { - return this.delegate.read(b, off, len); + public void readFully(long position, byte[] buffer, int offset, int length) throws IOException { + this.delegate.readFully(position, buffer, offset, length); + } + + @Override + public int readTail(byte[] buffer, int offset, int length) throws IOException { + return this.delegate.readTail(buffer, offset, length); } @Override diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/AnalyticsAcceleratorUtil.java b/aws/src/main/java/org/apache/iceberg/aws/s3/AnalyticsAcceleratorUtil.java index ed1c3a4879e3..9fba60046c94 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/AnalyticsAcceleratorUtil.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/AnalyticsAcceleratorUtil.java @@ -22,8 +22,14 @@ import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.RemovalListener; import java.io.IOException; +import java.net.SocketException; +import java.net.SocketTimeoutException; +import java.util.List; +import javax.net.ssl.SSLException; +import org.apache.hadoop.classification.VisibleForTesting; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.SeekableInputStream; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.util.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,10 +45,16 @@ import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata; import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation; import software.amazon.s3.analyticsaccelerator.util.S3URI; +import software.amazon.s3.analyticsaccelerator.util.retry.DefaultRetryStrategyImpl; +import software.amazon.s3.analyticsaccelerator.util.retry.RetryPolicy; +import software.amazon.s3.analyticsaccelerator.util.retry.RetryStrategy; class AnalyticsAcceleratorUtil { private static final Logger LOG = LoggerFactory.getLogger(AnalyticsAcceleratorUtil.class); + private static final List> RETRYABLE_EXCEPTIONS = + ImmutableList.of(SSLException.class, SocketTimeoutException.class, SocketException.class); + private static final int MAX_RETRIES = 3; private static final Cache, S3SeekableInputStreamFactory> STREAM_FACTORY_CACHE = @@ -54,9 +66,18 @@ class AnalyticsAcceleratorUtil { (key, factory, cause) -> close(factory)) .build(); + private static final RetryStrategy RETRY_STRATEGY = + new DefaultRetryStrategyImpl( + RetryPolicy.builder().handle(RETRYABLE_EXCEPTIONS).withMaxRetries(MAX_RETRIES).build()); + private AnalyticsAcceleratorUtil() {} public static SeekableInputStream newStream(S3InputFile inputFile) { + return newStream(inputFile, RETRY_STRATEGY); + } + + @VisibleForTesting + static SeekableInputStream newStream(S3InputFile inputFile, RetryStrategy retryStrategy) { S3URI uri = S3URI.of(inputFile.uri().bucket(), inputFile.uri().key()); HeadObjectResponse metadata = inputFile.getObjectMetadata(); OpenStreamInformation openStreamInfo = @@ -66,6 +87,7 @@ public static SeekableInputStream newStream(S3InputFile inputFile) { .contentLength(metadata.contentLength()) .etag(metadata.eTag()) .build()) + .retryStrategy(retryStrategy) .build(); S3SeekableInputStreamFactory factory = @@ -75,7 +97,7 @@ public static SeekableInputStream newStream(S3InputFile inputFile) { try { S3SeekableInputStream seekableInputStream = factory.createStream(uri, openStreamInfo); - return new AnalyticsAcceleratorInputStreamWrapper(seekableInputStream); + return new AnalyticsAcceleratorInputStreamWrapper(seekableInputStream, inputFile.metrics()); } catch (IOException e) { throw new RuntimeIOException( e, "Failed to create S3 analytics accelerator input stream for: %s", inputFile.uri()); @@ -85,7 +107,7 @@ public static SeekableInputStream newStream(S3InputFile inputFile) { private static S3SeekableInputStreamFactory createNewFactory( Pair cacheKey) { ConnectorConfiguration connectorConfiguration = - new ConnectorConfiguration(cacheKey.second().s3AnalyticsacceleratorProperties()); + new ConnectorConfiguration(cacheKey.second().s3AnalyticsAcceleratorProperties()); S3SeekableInputStreamConfiguration streamConfiguration = S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration); ObjectClientConfiguration objectClientConfiguration = diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/DefaultS3FileIOAwsClientFactory.java b/aws/src/main/java/org/apache/iceberg/aws/s3/DefaultS3FileIOAwsClientFactory.java index 2dec40e7f897..a94630f9544c 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/DefaultS3FileIOAwsClientFactory.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/DefaultS3FileIOAwsClientFactory.java @@ -20,6 +20,7 @@ import java.util.Map; import org.apache.iceberg.aws.AwsClientProperties; +import org.apache.iceberg.aws.HttpAsyncClientProperties; import org.apache.iceberg.aws.HttpClientProperties; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3Client; @@ -28,11 +29,13 @@ class DefaultS3FileIOAwsClientFactory implements S3FileIOAwsClientFactory { private S3FileIOProperties s3FileIOProperties; private HttpClientProperties httpClientProperties; private AwsClientProperties awsClientProperties; + private HttpAsyncClientProperties httpAsyncClientProperties; DefaultS3FileIOAwsClientFactory() { this.s3FileIOProperties = new S3FileIOProperties(); this.httpClientProperties = new HttpClientProperties(); this.awsClientProperties = new AwsClientProperties(); + this.httpAsyncClientProperties = new HttpAsyncClientProperties(); } @Override @@ -40,6 +43,7 @@ public void initialize(Map properties) { this.s3FileIOProperties = new S3FileIOProperties(properties); this.awsClientProperties = new AwsClientProperties(properties); this.httpClientProperties = new HttpClientProperties(properties); + this.httpAsyncClientProperties = new HttpAsyncClientProperties(properties); } @Override @@ -66,16 +70,25 @@ public S3AsyncClient s3Async() { if (s3FileIOProperties.isS3CRTEnabled()) { return S3AsyncClient.crtBuilder() .applyMutation(awsClientProperties::applyClientRegionConfiguration) - .applyMutation(awsClientProperties::applyClientCredentialConfigurations) + .applyMutation(httpAsyncClientProperties::applyHttpAsyncClientConfigurations) .applyMutation(s3FileIOProperties::applyEndpointConfigurations) - .applyMutation(s3FileIOProperties::applyS3CrtConfigurations) + .applyMutation(s3FileIOProperties::applyServiceConfigurations) + .applyMutation( + b -> s3FileIOProperties.applyCredentialConfigurations(awsClientProperties, b)) .build(); } return S3AsyncClient.builder() .applyMutation(awsClientProperties::applyClientRegionConfiguration) - .applyMutation(awsClientProperties::applyClientCredentialConfigurations) .applyMutation(awsClientProperties::applyLegacyMd5Plugin) + .applyMutation(httpAsyncClientProperties::applyHttpAsyncClientConfigurations) .applyMutation(s3FileIOProperties::applyEndpointConfigurations) + .applyMutation(s3FileIOProperties::applyServiceConfigurations) + .applyMutation( + b -> s3FileIOProperties.applyCredentialConfigurations(awsClientProperties, b)) + .applyMutation(s3FileIOProperties::applySignerConfiguration) + .applyMutation(s3FileIOProperties::applyS3AccessGrantsConfigurations) + .applyMutation(s3FileIOProperties::applyUserAgentConfigurations) + .applyMutation(s3FileIOProperties::applyRetryConfigurations) .build(); } } diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3AccessGrantsPluginConfigurations.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3AccessGrantsPluginConfigurations.java index a5f5ea6fa118..f53014b0988a 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3AccessGrantsPluginConfigurations.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3AccessGrantsPluginConfigurations.java @@ -21,14 +21,14 @@ import java.util.Map; import org.apache.iceberg.util.PropertyUtil; import software.amazon.awssdk.s3accessgrants.plugin.S3AccessGrantsPlugin; -import software.amazon.awssdk.services.s3.S3ClientBuilder; +import software.amazon.awssdk.services.s3.S3BaseClientBuilder; class S3AccessGrantsPluginConfigurations { private boolean isS3AccessGrantsFallbackToIamEnabled; private S3AccessGrantsPluginConfigurations() {} - public void configureS3ClientBuilder(T builder) { + public > void configureS3ClientBuilder(T builder) { S3AccessGrantsPlugin s3AccessGrantsPlugin = S3AccessGrantsPlugin.builder().enableFallback(isS3AccessGrantsFallbackToIamEnabled).build(); builder.addPlugin(s3AccessGrantsPlugin); diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java index 6bf582f00bbb..28cea2436e66 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java @@ -50,7 +50,6 @@ import software.amazon.awssdk.core.retry.conditions.RetryOnExceptionsCondition; import software.amazon.awssdk.core.retry.conditions.TokenBucketRetryCondition; import software.amazon.awssdk.services.s3.S3BaseClientBuilder; -import software.amazon.awssdk.services.s3.S3ClientBuilder; import software.amazon.awssdk.services.s3.S3Configuration; import software.amazon.awssdk.services.s3.S3CrtAsyncClientBuilder; import software.amazon.awssdk.services.s3.model.ObjectCannedACL; @@ -99,18 +98,7 @@ public class S3FileIOProperties implements Serializable { /** This property is used to specify if the S3 Async clients should be created using CRT. */ public static final String S3_CRT_ENABLED = "s3.crt.enabled"; - public static final boolean S3_CRT_ENABLED_DEFAULT = true; - - /** This property is used to specify the max concurrency for S3 CRT clients. */ - public static final String S3_CRT_MAX_CONCURRENCY = "s3.crt.max-concurrency"; - - /** - * To fully benefit from the analytics-accelerator-s3 library where this S3 CRT client is used, it - * is recommended to initialize with higher concurrency. - * - *

For more details, see: https://github.com/awslabs/analytics-accelerator-s3 - */ - public static final int S3_CRT_MAX_CONCURRENCY_DEFAULT = 500; + public static final boolean S3_CRT_ENABLED_DEFAULT = false; /** * The fallback-to-iam property allows users to customize whether or not they would like their @@ -525,9 +513,8 @@ public class S3FileIOProperties implements Serializable { private final String endpoint; private final boolean isRemoteSigningEnabled; private final boolean isS3AnalyticsAcceleratorEnabled; - private final Map s3AnalyticsacceleratorProperties; + private final Map s3AnalyticsAcceleratorProperties; private final boolean isS3CRTEnabled; - private final int s3CrtMaxConcurrency; private String writeStorageClass; private int s3RetryNumRetries; private long s3RetryMinWaitMs; @@ -573,9 +560,8 @@ public S3FileIOProperties() { this.s3DirectoryBucketListPrefixAsDirectory = S3_DIRECTORY_BUCKET_LIST_PREFIX_AS_DIRECTORY_DEFAULT; this.isS3AnalyticsAcceleratorEnabled = S3_ANALYTICS_ACCELERATOR_ENABLED_DEFAULT; - this.s3AnalyticsacceleratorProperties = Maps.newHashMap(); + this.s3AnalyticsAcceleratorProperties = Maps.newHashMap(); this.isS3CRTEnabled = S3_CRT_ENABLED_DEFAULT; - this.s3CrtMaxConcurrency = S3_CRT_MAX_CONCURRENCY_DEFAULT; this.allProperties = Maps.newHashMap(); ValidationException.check( @@ -691,13 +677,10 @@ public S3FileIOProperties(Map properties) { this.isS3AnalyticsAcceleratorEnabled = PropertyUtil.propertyAsBoolean( properties, S3_ANALYTICS_ACCELERATOR_ENABLED, S3_ANALYTICS_ACCELERATOR_ENABLED_DEFAULT); - this.s3AnalyticsacceleratorProperties = + this.s3AnalyticsAcceleratorProperties = PropertyUtil.propertiesWithPrefix(properties, S3_ANALYTICS_ACCELERATOR_PROPERTIES_PREFIX); this.isS3CRTEnabled = PropertyUtil.propertyAsBoolean(properties, S3_CRT_ENABLED, S3_CRT_ENABLED_DEFAULT); - this.s3CrtMaxConcurrency = - PropertyUtil.propertyAsInt( - properties, S3_CRT_MAX_CONCURRENCY, S3_CRT_MAX_CONCURRENCY_DEFAULT); ValidationException.check( keyIdAccessKeyBothConfigured(), @@ -816,18 +799,14 @@ public boolean isS3AnalyticsAcceleratorEnabled() { return isS3AnalyticsAcceleratorEnabled; } - public Map s3AnalyticsacceleratorProperties() { - return s3AnalyticsacceleratorProperties; + public Map s3AnalyticsAcceleratorProperties() { + return s3AnalyticsAcceleratorProperties; } public boolean isS3CRTEnabled() { return isS3CRTEnabled; } - public int s3CrtMaxConcurrency() { - return s3CrtMaxConcurrency; - } - public String endpoint() { return this.endpoint; } @@ -976,16 +955,17 @@ private AwsCredentialsProvider getCredentialsProvider(AwsClientProperties awsCli } /** - * Configure services settings for an S3 client. The settings include: s3DualStack, - * crossRegionAccessEnabled, s3UseArnRegion, s3PathStyleAccess, and s3Acceleration + * Configure services settings for S3 sync and S3 async clients. The settings include: + * s3DualStack, crossRegionAccessEnabled, s3UseArnRegion, s3PathStyleAccess, and s3Acceleration * *

Sample usage: * *

    *     S3Client.builder().applyMutation(s3FileIOProperties::applyS3ServiceConfigurations)
+   *     S3AsyncClient.builder().applyMutation(s3FileIOProperties::applyS3ServiceConfigurations)
    * 
*/ - public void applyServiceConfigurations(T builder) { + public > void applyServiceConfigurations(T builder) { builder .dualstackEnabled(isDualStackEnabled) .crossRegionAccessEnabled(isCrossRegionAccessEnabled) @@ -998,15 +978,33 @@ public void applyServiceConfigurations(T builder) { } /** - * Configure a signer for an S3 client. + * Configure services settings for an S3 CRT client. The settings include: + * crossRegionAccessEnabled, s3PathStyleAccess, and s3Acceleration + * + *

Sample usage: + * + *

+   *     S3AsyncClient.crtBuilder().applyMutation(s3FileIOProperties::applyS3ServiceConfigurations)
+   * 
+ */ + public void applyServiceConfigurations(T builder) { + builder + .crossRegionAccessEnabled(isCrossRegionAccessEnabled) + .forcePathStyle(isPathStyleAccess) + .accelerate(isAccelerationEnabled); + } + + /** + * Configure a signer for S3 sync and S3 async clients. * *

Sample usage: * *

    *     S3Client.builder().applyMutation(s3FileIOProperties::applyS3SignerConfiguration)
+   *     S3AsyncClient.builder().applyMutation(s3FileIOProperties::applyS3SignerConfiguration)
    * 
*/ - public void applySignerConfiguration(T builder) { + public > void applySignerConfiguration(T builder) { if (isRemoteSigningEnabled) { ClientOverrideConfiguration.Builder configBuilder = null != builder.overrideConfiguration() @@ -1052,15 +1050,16 @@ public void applyEndpointConfigurations(T bu } /** - * Override the retry configurations for an S3 client. + * Override the retry configurations for S3 sync and async clients. * *

Sample usage: * *

    *     S3Client.builder().applyMutation(s3FileIOProperties::applyRetryConfigurations)
+   *     S3AsyncClient.builder().applyMutation(s3FileIOProperties::applyRetryConfigurations)
    * 
*/ - public void applyRetryConfigurations(T builder) { + public > void applyRetryConfigurations(T builder) { ClientOverrideConfiguration.Builder configBuilder = null != builder.overrideConfiguration() ? builder.overrideConfiguration().toBuilder() @@ -1117,15 +1116,16 @@ public void applyRetryConfigurations(T builder) { } /** - * Add the S3 Access Grants Plugin for an S3 client. + * Add the S3 Access Grants Plugin for S3 sync and async clients. * *

Sample usage: * *

    *     S3Client.builder().applyMutation(s3FileIOProperties::applyS3AccessGrantsConfigurations)
+   *     S3AsyncClient.builder().applyMutation(s3FileIOProperties::applyS3AccessGrantsConfigurations)
    * 
*/ - public void applyS3AccessGrantsConfigurations(T builder) { + public > void applyS3AccessGrantsConfigurations(T builder) { if (isS3AccessGrantsEnabled) { S3AccessGrantsPluginConfigurations s3AccessGrantsPluginConfigurations = loadSdkPluginConfigurations( @@ -1134,7 +1134,7 @@ public void applyS3AccessGrantsConfigurations(T buil } } - public void applyUserAgentConfigurations(T builder) { + public > void applyUserAgentConfigurations(T builder) { ClientOverrideConfiguration.Builder configBuilder = null != builder.overrideConfiguration() ? builder.overrideConfiguration().toBuilder() @@ -1145,10 +1145,6 @@ public void applyUserAgentConfigurations(T builder) .build()); } - public S3CrtAsyncClientBuilder applyS3CrtConfigurations(S3CrtAsyncClientBuilder builder) { - return builder.maxConcurrency(s3CrtMaxConcurrency()); - } - /** * Dynamically load the http client builder to avoid runtime deps requirements of any optional SDK * Plugins diff --git a/aws/src/test/java/org/apache/iceberg/aws/TestAwsClientFactories.java b/aws/src/test/java/org/apache/iceberg/aws/TestAwsClientFactories.java index fe95f9364673..1f254ae1003d 100644 --- a/aws/src/test/java/org/apache/iceberg/aws/TestAwsClientFactories.java +++ b/aws/src/test/java/org/apache/iceberg/aws/TestAwsClientFactories.java @@ -95,7 +95,7 @@ public void testS3AsyncClientWithCrtDisabled() { } @Test - public void testS3AsyncClientDefaultIsCrt() { + public void testS3AsyncClientDefaultIsNotCrt() { assertThat( AwsClientFactories.from( ImmutableMap.of( @@ -106,7 +106,7 @@ public void testS3AsyncClientDefaultIsCrt() { AwsClientProperties.CLIENT_REGION, "us-east-1")) .s3Async()) - .isInstanceOf(DefaultS3CrtAsyncClient.class); + .isNotInstanceOf(DefaultS3CrtAsyncClient.class); } @Test diff --git a/aws/src/test/java/org/apache/iceberg/aws/TestHttpAsyncClientConfigurations.java b/aws/src/test/java/org/apache/iceberg/aws/TestHttpAsyncClientConfigurations.java new file mode 100644 index 000000000000..bbe80270cc58 --- /dev/null +++ b/aws/src/test/java/org/apache/iceberg/aws/TestHttpAsyncClientConfigurations.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.aws; + +import java.time.Duration; +import java.util.Map; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; +import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; +import software.amazon.awssdk.http.nio.netty.ProxyConfiguration; +import software.amazon.awssdk.services.s3.S3CrtAsyncClientBuilder; +import software.amazon.awssdk.services.s3.crt.S3CrtHttpConfiguration; + +public class TestHttpAsyncClientConfigurations { + + @Test + public void testNettyOverrideConfigurations() { + Map properties = Maps.newHashMap(); + properties.put(HttpAsyncClientProperties.NETTY_CONNECTION_TIMEOUT_MS, "200"); + properties.put(HttpAsyncClientProperties.NETTY_SOCKET_TIMEOUT_MS, "100"); + properties.put(HttpAsyncClientProperties.NETTY_CONNECTION_ACQUISITION_TIMEOUT_MS, "101"); + properties.put(HttpAsyncClientProperties.NETTY_CONNECTION_MAX_IDLE_TIME_MS, "102"); + properties.put(HttpAsyncClientProperties.NETTY_CONNECTION_TIME_TO_LIVE_MS, "103"); + properties.put(HttpAsyncClientProperties.NETTY_MAX_CONNECTIONS, "104"); + properties.put(HttpAsyncClientProperties.NETTY_TCP_KEEP_ALIVE_ENABLED, "true"); + properties.put(HttpAsyncClientProperties.NETTY_USE_IDLE_CONNECTION_REAPER_ENABLED, "false"); + properties.put(HttpAsyncClientProperties.NETTY_PROXY_ENDPOINT, "http://proxy:8080"); + + NettyHttpAsyncClientConfigurations nettyConfigurations = + NettyHttpAsyncClientConfigurations.create(properties); + NettyNioAsyncHttpClient.Builder nettyBuilder = NettyNioAsyncHttpClient.builder(); + NettyNioAsyncHttpClient.Builder spyNettyBuilder = Mockito.spy(nettyBuilder); + + nettyConfigurations.configureNettyHttpAsyncClientBuilder(spyNettyBuilder); + + Mockito.verify(spyNettyBuilder).connectionTimeout(Duration.ofMillis(200)); + Mockito.verify(spyNettyBuilder).readTimeout(Duration.ofMillis(100)); + Mockito.verify(spyNettyBuilder).writeTimeout(Duration.ofMillis(100)); + Mockito.verify(spyNettyBuilder).connectionAcquisitionTimeout(Duration.ofMillis(101)); + Mockito.verify(spyNettyBuilder).connectionMaxIdleTime(Duration.ofMillis(102)); + Mockito.verify(spyNettyBuilder).connectionTimeToLive(Duration.ofMillis(103)); + Mockito.verify(spyNettyBuilder).maxConcurrency(104); + Mockito.verify(spyNettyBuilder).tcpKeepAlive(true); + Mockito.verify(spyNettyBuilder).useIdleConnectionReaper(false); + Mockito.verify(spyNettyBuilder).proxyConfiguration(Mockito.any(ProxyConfiguration.class)); + } + + @Test + public void testNettyDefaultConfigurations() { + NettyHttpAsyncClientConfigurations nettyConfigurations = + NettyHttpAsyncClientConfigurations.create(Maps.newHashMap()); + NettyNioAsyncHttpClient.Builder nettyBuilder = NettyNioAsyncHttpClient.builder(); + NettyNioAsyncHttpClient.Builder spyNettyBuilder = Mockito.spy(nettyBuilder); + + nettyConfigurations.configureNettyHttpAsyncClientBuilder(spyNettyBuilder); + + Mockito.verify(spyNettyBuilder, Mockito.never()).connectionTimeout(Mockito.any(Duration.class)); + Mockito.verify(spyNettyBuilder, Mockito.never()).readTimeout(Mockito.any(Duration.class)); + Mockito.verify(spyNettyBuilder, Mockito.never()).writeTimeout(Mockito.any(Duration.class)); + Mockito.verify(spyNettyBuilder, Mockito.never()) + .connectionAcquisitionTimeout(Mockito.any(Duration.class)); + Mockito.verify(spyNettyBuilder, Mockito.never()) + .connectionMaxIdleTime(Mockito.any(Duration.class)); + Mockito.verify(spyNettyBuilder, Mockito.never()) + .connectionTimeToLive(Mockito.any(Duration.class)); + Mockito.verify(spyNettyBuilder, Mockito.never()).maxConcurrency(Mockito.anyInt()); + Mockito.verify(spyNettyBuilder, Mockito.never()).tcpKeepAlive(Mockito.anyBoolean()); + Mockito.verify(spyNettyBuilder, Mockito.never()).useIdleConnectionReaper(Mockito.anyBoolean()); + Mockito.verify(spyNettyBuilder, Mockito.never()) + .proxyConfiguration(Mockito.any(ProxyConfiguration.class)); + } + + @Test + public void testCrtOverrideConfigurations() { + Map properties = Maps.newHashMap(); + properties.put(HttpAsyncClientProperties.CRT_CONNECTION_TIMEOUT_MS, "200"); + properties.put(HttpAsyncClientProperties.CRT_MAX_CONCURRENCY, "50"); + + CrtHttpAsyncClientConfigurations crtConfigurations = + CrtHttpAsyncClientConfigurations.create(properties); + S3CrtAsyncClientBuilder crtBuilder = Mockito.mock(S3CrtAsyncClientBuilder.class); + + crtConfigurations.configureCrtHttpAsyncClientBuilder(crtBuilder); + + Mockito.verify(crtBuilder).httpConfiguration(Mockito.any(S3CrtHttpConfiguration.class)); + Mockito.verify(crtBuilder).maxConcurrency(50); + } + + @Test + public void testCrtDefaultConfigurations() { + CrtHttpAsyncClientConfigurations crtConfigurations = + CrtHttpAsyncClientConfigurations.create(Maps.newHashMap()); + S3CrtAsyncClientBuilder crtBuilder = Mockito.mock(S3CrtAsyncClientBuilder.class); + + crtConfigurations.configureCrtHttpAsyncClientBuilder(crtBuilder); + + Mockito.verify(crtBuilder, Mockito.never()) + .httpConfiguration(Mockito.any(S3CrtHttpConfiguration.class)); + Mockito.verify(crtBuilder) + .maxConcurrency(HttpAsyncClientProperties.CRT_MAX_CONCURRENCY_DEFAULT); + } +} diff --git a/aws/src/test/java/org/apache/iceberg/aws/TestHttpAsyncClientProperties.java b/aws/src/test/java/org/apache/iceberg/aws/TestHttpAsyncClientProperties.java new file mode 100644 index 000000000000..4ac63eaf70a0 --- /dev/null +++ b/aws/src/test/java/org/apache/iceberg/aws/TestHttpAsyncClientProperties.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.aws; + +import java.util.Map; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; +import software.amazon.awssdk.services.s3.S3AsyncClientBuilder; +import software.amazon.awssdk.services.s3.S3CrtAsyncClientBuilder; + +public class TestHttpAsyncClientProperties { + + @Test + public void testNettyAsyncClientConfiguration() { + Map properties = Maps.newHashMap(); + properties.put(HttpAsyncClientProperties.NETTY_CONNECTION_TIMEOUT_MS, "200"); + properties.put(HttpAsyncClientProperties.NETTY_MAX_CONNECTIONS, "100"); + + HttpAsyncClientProperties httpAsyncClientProperties = new HttpAsyncClientProperties(properties); + S3AsyncClientBuilder s3AsyncClientBuilder = Mockito.mock(S3AsyncClientBuilder.class); + + httpAsyncClientProperties.applyHttpAsyncClientConfigurations(s3AsyncClientBuilder); + + Mockito.verify(s3AsyncClientBuilder).httpClientBuilder(Mockito.any()); + } + + @Test + public void testCrtAsyncClientConfiguration() { + Map properties = Maps.newHashMap(); + properties.put(HttpAsyncClientProperties.CRT_CONNECTION_TIMEOUT_MS, "200"); + properties.put(HttpAsyncClientProperties.CRT_MAX_CONCURRENCY, "50"); + + HttpAsyncClientProperties httpAsyncClientProperties = new HttpAsyncClientProperties(properties); + S3CrtAsyncClientBuilder s3CrtAsyncClientBuilder = Mockito.mock(S3CrtAsyncClientBuilder.class); + + httpAsyncClientProperties.applyHttpAsyncClientConfigurations(s3CrtAsyncClientBuilder); + + Mockito.verify(s3CrtAsyncClientBuilder).maxConcurrency(Mockito.any()); + } +} diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/StaticClientFactory.java b/aws/src/test/java/org/apache/iceberg/aws/s3/StaticClientFactory.java index 76f47adb17e8..d9cb2871f825 100644 --- a/aws/src/test/java/org/apache/iceberg/aws/s3/StaticClientFactory.java +++ b/aws/src/test/java/org/apache/iceberg/aws/s3/StaticClientFactory.java @@ -28,6 +28,7 @@ class StaticClientFactory implements AwsClientFactory { static S3Client client; + static S3AsyncClient asyncClient; @Override public S3Client s3() { @@ -36,7 +37,7 @@ public S3Client s3() { @Override public S3AsyncClient s3Async() { - return null; + return asyncClient; } @Override diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIOProperties.java b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIOProperties.java index b7e9be67b2ac..60775a4cb915 100644 --- a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIOProperties.java +++ b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIOProperties.java @@ -126,9 +126,6 @@ public void testS3FileIOPropertiesDefaultValues() { assertThat(Collections.emptyMap()).isEqualTo(s3FileIOProperties.bucketToAccessPointMapping()); - assertThat(S3FileIOProperties.S3_CRT_MAX_CONCURRENCY_DEFAULT) - .isEqualTo(s3FileIOProperties.s3CrtMaxConcurrency()); - assertThat(S3FileIOProperties.S3_CRT_ENABLED_DEFAULT) .isEqualTo(s3FileIOProperties.isS3CRTEnabled()); @@ -279,11 +276,6 @@ public void testS3FileIOProperties() { assertThat(map).containsEntry(S3FileIOProperties.WRITE_STORAGE_CLASS, "INTELLIGENT_TIERING"); - assertThat(map) - .containsEntry( - S3FileIOProperties.S3_CRT_MAX_CONCURRENCY, - String.valueOf(s3FileIOProperties.s3CrtMaxConcurrency())); - assertThat(map) .containsEntry( S3FileIOProperties.S3_CRT_ENABLED, String.valueOf(s3FileIOProperties.isS3CRTEnabled())); @@ -439,7 +431,6 @@ private Map getTestProperties() { map.put(S3FileIOProperties.PRELOAD_CLIENT_ENABLED, "true"); map.put(S3FileIOProperties.REMOTE_SIGNING_ENABLED, "true"); map.put(S3FileIOProperties.WRITE_STORAGE_CLASS, "INTELLIGENT_TIERING"); - map.put(S3FileIOProperties.S3_CRT_MAX_CONCURRENCY, "200"); map.put(S3FileIOProperties.S3_CRT_ENABLED, "false"); map.put(S3FileIOProperties.S3_ANALYTICS_ACCELERATOR_ENABLED, "true"); return map; @@ -543,17 +534,6 @@ public void testApplyUserAgentConfigurations() { .overrideConfiguration(Mockito.any(ClientOverrideConfiguration.class)); } - @Test - public void testApplyS3CrtConfigurations() { - Map properties = Maps.newHashMap(); - S3FileIOProperties s3FileIOProperties = new S3FileIOProperties(properties); - S3CrtAsyncClientBuilder mockS3CrtAsyncClientBuilder = - Mockito.mock(S3CrtAsyncClientBuilder.class); - s3FileIOProperties.applyS3CrtConfigurations(mockS3CrtAsyncClientBuilder); - - Mockito.verify(mockS3CrtAsyncClientBuilder).maxConcurrency(Mockito.any(Integer.class)); - } - @Test public void testApplyRetryConfiguration() { Map properties = Maps.newHashMap(); diff --git a/build.gradle b/build.gradle index 6e888eebcb92..e901097f0e8c 100644 --- a/build.gradle +++ b/build.gradle @@ -472,6 +472,7 @@ project(':iceberg-aws') { compileOnly(libs.awssdk.s3accessgrants) compileOnly("software.amazon.awssdk:url-connection-client") compileOnly("software.amazon.awssdk:apache-client") + compileOnly("software.amazon.awssdk:netty-nio-client") compileOnly("software.amazon.awssdk:auth") compileOnly("software.amazon.awssdk:http-auth-aws-crt") compileOnly("software.amazon.awssdk:s3") diff --git a/docs/docs/aws.md b/docs/docs/aws.md index b6123a6ae977..dc3a0e3398cf 100644 --- a/docs/docs/aws.md +++ b/docs/docs/aws.md @@ -589,8 +589,7 @@ The Analytics Accelerator Library can work with either the [S3 CRT client](https | Property | Default | Description | |------------------------|---------|--------------------------------------------------------------| -| s3.crt.enabled | `true` | Controls if the S3 Async clients should be created using CRT | -| s3.crt.max-concurrency | `500` | Max concurrency for S3 CRT clients | +| s3.crt.enabled | `false` | Controls if the S3 Async clients should be created using CRT | Additional library specific configurations are organized into the following sections: diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index d7e41f597abc..693fba5f8e3a 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -22,7 +22,7 @@ [versions] activation = "1.1.1" aliyun-sdk-oss = "3.10.2" -analyticsaccelerator = "1.0.0" +analyticsaccelerator = "1.2.1" antlr = "4.9.3" antlr413 = "4.13.1" # For Spark 4.0 support aircompressor = "0.27" diff --git a/kafka-connect/build.gradle b/kafka-connect/build.gradle index a811404b14f7..cbcd6f3ccea2 100644 --- a/kafka-connect/build.gradle +++ b/kafka-connect/build.gradle @@ -114,6 +114,7 @@ project(':iceberg-kafka-connect:iceberg-kafka-connect-runtime') { implementation project(':iceberg-aws') implementation platform(libs.awssdk.bom) implementation 'software.amazon.awssdk:apache-client' + implementation 'software.amazon.awssdk:netty-nio-client' implementation 'software.amazon.awssdk:auth' implementation "software.amazon.awssdk:http-auth-aws-crt" implementation 'software.amazon.awssdk:iam'