diff --git a/api/src/main/java/org/apache/iceberg/io/ParquetObjectRange.java b/api/src/main/java/org/apache/iceberg/io/ParquetObjectRange.java new file mode 100644 index 000000000000..7de5349a94bb --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/io/ParquetObjectRange.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.io; + +import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; + +public class ParquetObjectRange { + public CompletableFuture getByteBuffer() { + return byteBuffer; + } + + public void setByteBuffer(CompletableFuture byteBuffer) { + this.byteBuffer = byteBuffer; + } + + public long getOffset() { + return offset; + } + + public void setOffset(long offset) { + this.offset = offset; + } + + public int getLength() { + return length; + } + + public void setLength(int length) { + this.length = length; + } + + private CompletableFuture byteBuffer; + private long offset; + private int length; + + public ParquetObjectRange(CompletableFuture byteBuffer, long offset, int length) { + this.byteBuffer = byteBuffer; + this.offset = offset; + this.length = length; + } +} diff --git a/api/src/main/java/org/apache/iceberg/io/SeekableInputStream.java b/api/src/main/java/org/apache/iceberg/io/SeekableInputStream.java index 290e9661da69..c7081a4009f9 100644 --- a/api/src/main/java/org/apache/iceberg/io/SeekableInputStream.java +++ b/api/src/main/java/org/apache/iceberg/io/SeekableInputStream.java @@ -20,6 +20,9 @@ import java.io.IOException; import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.function.IntFunction; /** * {@code SeekableInputStream} is an interface with the methods needed to read data from a file or @@ -43,4 +46,14 @@ public abstract class SeekableInputStream extends InputStream { * @throws IOException If the underlying stream throws IOException */ public abstract void seek(long newPos) throws IOException; + + public void readVectored(List ranges, IntFunction allocate) + throws IOException { + throw new UnsupportedOperationException( + "Default iceberg stream doesn't support read vector io"); + } + + public boolean readVectoredAvailable(IntFunction allocate) { + return false; + } } diff --git a/aws-bundle/build.gradle b/aws-bundle/build.gradle index 9e937aa0edb7..d86a8d2a08a1 100644 --- a/aws-bundle/build.gradle +++ b/aws-bundle/build.gradle @@ -39,7 +39,7 @@ project(":iceberg-aws-bundle") { implementation "software.amazon.awssdk:dynamodb" implementation "software.amazon.awssdk:lakeformation" - implementation libs.analyticsaccelerator.s3 + implementation(libs.analyticsaccelerator.s3) } shadowJar { @@ -52,10 +52,6 @@ project(":iceberg-aws-bundle") { include 'NOTICE' } - dependencies { - exclude(dependency('org.slf4j:slf4j-api')) - } - // relocate AWS-specific versions relocate 'org.apache.http', 'org.apache.iceberg.aws.shaded.org.apache.http' relocate 'io.netty', 'org.apache.iceberg.aws.shaded.io.netty' diff --git a/aws/src/integration/java/org/apache/iceberg/aws/glue/TestGlueCatalogCommitFailure.java b/aws/src/integration/java/org/apache/iceberg/aws/glue/TestGlueCatalogCommitFailure.java index 177a627efcab..fecb34df7bf3 100644 --- a/aws/src/integration/java/org/apache/iceberg/aws/glue/TestGlueCatalogCommitFailure.java +++ b/aws/src/integration/java/org/apache/iceberg/aws/glue/TestGlueCatalogCommitFailure.java @@ -39,6 +39,8 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable; import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariables; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; import org.mockito.Mockito; import software.amazon.awssdk.core.metrics.CoreMetric; import software.amazon.awssdk.metrics.MetricCollector; @@ -156,8 +158,9 @@ public void testCheckCommitStatusAfterRetries() { .isEqualTo(2); } - @Test - public void testNoRetryAwarenessCorruptsTable() { + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testNoRetryAwarenessCorruptsTable(Map aalProperties) { // This test exists to replicate the issue the prior test validates the fix for // See https://github.com/apache/iceberg/issues/7151 String namespace = createNamespace(); diff --git a/aws/src/integration/java/org/apache/iceberg/aws/s3/MinioUtil.java b/aws/src/integration/java/org/apache/iceberg/aws/s3/MinioUtil.java index 90557720fa0e..17dc42749fdc 100644 --- a/aws/src/integration/java/org/apache/iceberg/aws/s3/MinioUtil.java +++ b/aws/src/integration/java/org/apache/iceberg/aws/s3/MinioUtil.java @@ -26,6 +26,8 @@ import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.LegacyMd5Plugin; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.S3AsyncClientBuilder; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.S3ClientBuilder; @@ -73,4 +75,16 @@ public static S3Client createS3Client(MinIOContainer container, boolean legacyMd builder.forcePathStyle(true); // OSX won't resolve subdomains return builder.build(); } + + public static S3AsyncClient createS3AsyncClient(MinIOContainer container) { + URI uri = URI.create(container.getS3URL()); + S3AsyncClientBuilder builder = S3AsyncClient.builder(); + builder.credentialsProvider( + StaticCredentialsProvider.create( + AwsBasicCredentials.create(container.getUserName(), container.getPassword()))); + builder.applyMutation(mutator -> mutator.endpointOverride(uri)); + builder.region(Region.US_EAST_1); + builder.forcePathStyle(true); // OSX won't resolve subdomains + return builder.build(); + } } diff --git a/aws/src/integration/java/org/apache/iceberg/aws/s3/S3TestUtil.java b/aws/src/integration/java/org/apache/iceberg/aws/s3/S3TestUtil.java index 8700da07c449..64b93e6454ee 100644 --- a/aws/src/integration/java/org/apache/iceberg/aws/s3/S3TestUtil.java +++ b/aws/src/integration/java/org/apache/iceberg/aws/s3/S3TestUtil.java @@ -18,8 +18,20 @@ */ package org.apache.iceberg.aws.s3; +import static org.assertj.core.api.Assumptions.assumeThat; + +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.junit.jupiter.params.provider.Arguments; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class S3TestUtil { + private static final Logger LOG = LoggerFactory.getLogger(S3TestUtil.class); + private S3TestUtil() {} public static String getBucketFromUri(String s3Uri) { @@ -29,4 +41,38 @@ public static String getBucketFromUri(String s3Uri) { public static String getKeyFromUri(String s3Uri) { return new S3URI(s3Uri).key(); } + + /** + * Skip a test if the Analytics Accelerator Library for Amazon S3 is enabled. + * + * @param properties properties to probe + */ + public static void skipIfAnalyticsAcceleratorEnabled( + S3FileIOProperties properties, String message) { + boolean isAcceleratorEnabled = properties.isS3AnalyticsAcceleratorEnabled(); + if (isAcceleratorEnabled) { + LOG.warn(message); + } + assumeThat(!isAcceleratorEnabled).describedAs(message).isTrue(); + } + + public static Stream analyticsAcceleratorLibraryProperties() { + return listAnalyticsAcceleratorLibraryProperties().stream().map(Arguments::of); + } + + public static List> listAnalyticsAcceleratorLibraryProperties() { + return List.of( + ImmutableMap.of( + S3FileIOProperties.S3_ANALYTICS_ACCELERATOR_ENABLED, Boolean.toString(true)), + ImmutableMap.of( + S3FileIOProperties.S3_ANALYTICS_ACCELERATOR_ENABLED, Boolean.toString(false))); + } + + public static Map mergeProperties( + Map aalProperties, Map testProperties) { + return ImmutableMap.builder() + .putAll(aalProperties) + .putAll(testProperties) + .build(); + } } diff --git a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestFlakyS3InputStream.java b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestFlakyS3InputStream.java index f98d1a3d4471..e514e189d070 100644 --- a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestFlakyS3InputStream.java +++ b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestFlakyS3InputStream.java @@ -18,6 +18,8 @@ */ package org.apache.iceberg.aws.s3; +import static org.apache.iceberg.aws.s3.S3TestUtil.listAnalyticsAcceleratorLibraryProperties; +import static org.apache.iceberg.aws.s3.S3TestUtil.skipIfAnalyticsAcceleratorEnabled; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; @@ -27,18 +29,28 @@ import java.io.IOException; import java.io.InputStream; import java.net.SocketTimeoutException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; import javax.net.ssl.SSLException; +import org.apache.iceberg.io.SeekableInputStream; +import org.apache.iceberg.metrics.MetricsContext; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import software.amazon.awssdk.awscore.exception.AwsServiceException; import software.amazon.awssdk.core.ResponseInputStream; +import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.core.async.AsyncResponseTransformer; import software.amazon.awssdk.core.exception.SdkClientException; import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.core.sync.ResponseTransformer; +import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.CreateBucketRequest; import software.amazon.awssdk.services.s3.model.CreateBucketResponse; @@ -59,7 +71,18 @@ public void setupTest() { } @Override - S3InputStream newInputStream(S3Client s3Client, S3URI uri) { + SeekableInputStream newInputStream( + S3Client s3Client, + S3AsyncClient s3AsyncClient, + S3URI uri, + Map aalProperties) { + final S3FileIOProperties s3FileIOProperties = new S3FileIOProperties(aalProperties); + if (s3FileIOProperties.isS3AnalyticsAcceleratorEnabled()) { + PrefixedS3Client client = + new PrefixedS3Client("s3", aalProperties, () -> s3Client, () -> s3AsyncClient); + return AnalyticsAcceleratorUtil.newStream( + S3InputFile.fromLocation(uri.location(), client, MetricsContext.nullMetrics())); + } return new S3InputStream(s3Client, uri) { @Override void resetForRetry() throws IOException { @@ -70,64 +93,132 @@ void resetForRetry() throws IOException { } @ParameterizedTest - @MethodSource("retryableExceptions") - public void testReadWithFlakyStreamRetrySucceed(IOException exception) throws Exception { - testRead(flakyStreamClient(new AtomicInteger(3), exception)); + @MethodSource("aalPropertiesAndRetryableExceptions") + public void testReadWithFlakyStreamRetrySucceed( + Map aalProperties, IOException exception) throws Exception { + final S3FileIOProperties s3FileIOProperties = new S3FileIOProperties(aalProperties); + skipIfAnalyticsAcceleratorEnabled( + s3FileIOProperties, "Analytics Accelerator Library does not support retries at read level"); + testRead( + flakyStreamClient(new AtomicInteger(3), exception), + flakyStreamAsyncClient(new AtomicInteger(3), exception), + aalProperties); assertThat(resetForRetryCounter.get()).isEqualTo(2); } @ParameterizedTest - @MethodSource("retryableExceptions") - public void testReadWithFlakyStreamExhaustedRetries(IOException exception) { - assertThatThrownBy(() -> testRead(flakyStreamClient(new AtomicInteger(5), exception))) + @MethodSource("aalPropertiesAndRetryableExceptions") + public void testReadWithFlakyStreamExhaustedRetries( + Map aalProperties, IOException exception) { + final S3FileIOProperties s3FileIOProperties = new S3FileIOProperties(aalProperties); + skipIfAnalyticsAcceleratorEnabled( + s3FileIOProperties, "Analytics Accelerator Library does not support retries at read level"); + assertThatThrownBy( + () -> + testRead( + flakyStreamClient(new AtomicInteger(5), exception), + flakyStreamAsyncClient(new AtomicInteger(5), exception), + aalProperties)) .isInstanceOf(exception.getClass()) .hasMessage(exception.getMessage()); assertThat(resetForRetryCounter.get()).isEqualTo(3); } @ParameterizedTest - @MethodSource("nonRetryableExceptions") - public void testReadWithFlakyStreamNonRetryableException(IOException exception) { - assertThatThrownBy(() -> testRead(flakyStreamClient(new AtomicInteger(3), exception))) + @MethodSource("aalPropertiesAndNonRetryableExceptions") + public void testReadWithFlakyStreamNonRetryableException( + Map aalProperties, IOException exception) { + final S3FileIOProperties s3FileIOProperties = new S3FileIOProperties(aalProperties); + skipIfAnalyticsAcceleratorEnabled( + s3FileIOProperties, "Analytics Accelerator wraps IOException differently"); + assertThatThrownBy( + () -> + testRead( + flakyStreamClient(new AtomicInteger(3), exception), + flakyStreamAsyncClient(new AtomicInteger(3), exception), + aalProperties)) .isInstanceOf(exception.getClass()) .hasMessage(exception.getMessage()); assertThat(resetForRetryCounter.get()).isEqualTo(0); } @ParameterizedTest - @MethodSource("retryableExceptions") - public void testSeekWithFlakyStreamRetrySucceed(IOException exception) throws Exception { - testSeek(flakyStreamClient(new AtomicInteger(3), exception)); + @MethodSource("aalPropertiesAndRetryableExceptions") + public void testSeekWithFlakyStreamRetrySucceed( + Map aalProperties, IOException exception) throws Exception { + final S3FileIOProperties s3FileIOProperties = new S3FileIOProperties(aalProperties); + skipIfAnalyticsAcceleratorEnabled( + s3FileIOProperties, "Analytics Accelerator Library does not support retries at read level"); + testSeek( + flakyStreamClient(new AtomicInteger(3), exception), + flakyStreamAsyncClient(new AtomicInteger(3), exception), + aalProperties); assertThat(resetForRetryCounter.get()).isEqualTo(2); } @ParameterizedTest - @MethodSource("retryableExceptions") - public void testSeekWithFlakyStreamExhaustedRetries(IOException exception) { - assertThatThrownBy(() -> testSeek(flakyStreamClient(new AtomicInteger(5), exception))) + @MethodSource("aalPropertiesAndRetryableExceptions") + public void testSeekWithFlakyStreamExhaustedRetries( + Map aalProperties, IOException exception) { + final S3FileIOProperties s3FileIOProperties = new S3FileIOProperties(aalProperties); + skipIfAnalyticsAcceleratorEnabled( + s3FileIOProperties, "Analytics Accelerator Library does not support retries at read level"); + assertThatThrownBy( + () -> + testSeek( + flakyStreamClient(new AtomicInteger(5), exception), + flakyStreamAsyncClient(new AtomicInteger(5), exception), + aalProperties)) .isInstanceOf(exception.getClass()) .hasMessage(exception.getMessage()); assertThat(resetForRetryCounter.get()).isEqualTo(3); } @ParameterizedTest - @MethodSource("nonRetryableExceptions") - public void testSeekWithFlakyStreamNonRetryableException(IOException exception) { - assertThatThrownBy(() -> testSeek(flakyStreamClient(new AtomicInteger(3), exception))) + @MethodSource("aalPropertiesAndNonRetryableExceptions") + public void testSeekWithFlakyStreamNonRetryableException( + Map aalProperties, IOException exception) { + final S3FileIOProperties s3FileIOProperties = new S3FileIOProperties(aalProperties); + skipIfAnalyticsAcceleratorEnabled( + s3FileIOProperties, "Analytics Accelerator wraps IOException differently"); + assertThatThrownBy( + () -> + testSeek( + flakyStreamClient(new AtomicInteger(3), exception), + flakyStreamAsyncClient(new AtomicInteger(3), exception), + aalProperties)) .isInstanceOf(exception.getClass()) .hasMessage(exception.getMessage()); assertThat(resetForRetryCounter.get()).isEqualTo(0); } - private static Stream retryableExceptions() { - return Stream.of( - Arguments.of( - new SocketTimeoutException("socket timeout exception"), - new SSLException("some ssl exception"))); + private static Stream aalPropertiesAndRetryableExceptions() { + return aalPropertiesAndExceptions(true); + } + + private static Stream aalPropertiesAndNonRetryableExceptions() { + return aalPropertiesAndExceptions(false); + } + + private static Stream aalPropertiesAndExceptions(boolean retryable) { + List exceptions = retryable ? retryableExceptions() : nonRetryableExceptions(); + List results = Lists.newArrayList(); + for (Map aalProperties : listAnalyticsAcceleratorLibraryProperties()) { + for (IOException exception : exceptions) { + results.add(Arguments.of(aalProperties, exception)); + } + } + return results.stream(); + } + + private static List retryableExceptions() { + return Arrays.asList( + new SocketTimeoutException("socket timeout exception"), + new SSLException("some ssl exception")); } - private static Stream nonRetryableExceptions() { - return Stream.of(Arguments.of(new IOException("some generic non-retryable IO exception"))); + private static List nonRetryableExceptions() { + return Arrays.asList(new IOException("some generic non-retryable IO exception")); } private S3ClientWrapper flakyStreamClient(AtomicInteger counter, IOException failure) { @@ -138,6 +229,14 @@ private S3ClientWrapper flakyStreamClient(AtomicInteger counter, IOException fai return flakyClient; } + private S3AsyncClientWrapper flakyStreamAsyncClient(AtomicInteger counter, IOException failure) { + S3AsyncClientWrapper flakyClient = spy(new S3AsyncClientWrapper(s3AsyncClient())); + doAnswer(invocation -> new FlakyInputStream(invocation.callRealMethod(), counter, failure)) + .when(flakyClient) + .getObject(any(GetObjectRequest.class), any(AsyncResponseTransformer.class)); + return flakyClient; + } + /** Wrapper for S3 client, used to mock the final class DefaultS3Client */ public static class S3ClientWrapper implements S3Client { @@ -184,6 +283,50 @@ public CreateBucketResponse createBucket(CreateBucketRequest createBucketRequest } } + /** Wrapper for S3 Async client, used to mock the final class DefaultS3AsyncClient */ + public static class S3AsyncClientWrapper implements S3AsyncClient { + + private final S3AsyncClient delegate; + + public S3AsyncClientWrapper(S3AsyncClient delegate) { + this.delegate = delegate; + } + + @Override + public String serviceName() { + return delegate.serviceName(); + } + + @Override + public void close() { + delegate.close(); + } + + @Override + public CompletableFuture getObject( + GetObjectRequest getObjectRequest, + AsyncResponseTransformer asyncResponseTransformer) { + return delegate.getObject(getObjectRequest, asyncResponseTransformer); + } + + @Override + public CompletableFuture headObject(HeadObjectRequest headObjectRequest) { + return delegate.headObject(headObjectRequest); + } + + @Override + public CompletableFuture putObject( + PutObjectRequest putObjectRequest, AsyncRequestBody requestBody) { + return delegate.putObject(putObjectRequest, requestBody); + } + + @Override + public CompletableFuture createBucket( + CreateBucketRequest createBucketRequest) { + return delegate.createBucket(createBucketRequest); + } + } + static class FlakyInputStream extends InputStream { private final ResponseInputStream delegate; private final AtomicInteger counter; diff --git a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestMinioUtil.java b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestMinioUtil.java index 9955aa7f8459..4f56455afcc1 100644 --- a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestMinioUtil.java +++ b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestMinioUtil.java @@ -22,12 +22,16 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.util.UUID; +import java.util.concurrent.CompletionException; import org.junit.jupiter.api.Test; import org.testcontainers.containers.MinIOContainer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; +import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.core.async.AsyncResponseTransformer; import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.core.sync.ResponseTransformer; +import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.CreateBucketRequest; import software.amazon.awssdk.services.s3.model.CreateBucketResponse; @@ -72,4 +76,41 @@ void validateS3ConditionalWrites() { String responseBody = getResponse.asUtf8String(); assertThat(responseBody).isEqualTo("test-payload-0"); } + + @Test + void validateS3ConditionalWritesUsingAsyncClient() { + S3AsyncClient s3AsyncClient = MinioUtil.createS3AsyncClient(MINIO); + + String bucket = "test-bucket-" + UUID.randomUUID(); + + CreateBucketResponse createBucketResponse = + s3AsyncClient.createBucket(CreateBucketRequest.builder().bucket(bucket).build()).join(); + assertThat(createBucketResponse.sdkHttpResponse().isSuccessful()).isTrue(); + + String key = "test-key-" + UUID.randomUUID().toString(); + for (int i = 0; i < 5; i++) { + String payload = "test-payload-" + i; + PutObjectRequest request = + PutObjectRequest.builder().bucket(bucket).key(key).ifNoneMatch("*").build(); + AsyncRequestBody body = AsyncRequestBody.fromString(payload); + if (i == 0) { + PutObjectResponse response = s3AsyncClient.putObject(request, body).join(); + assertThat(response.sdkHttpResponse().isSuccessful()).isTrue(); + } else { + assertThatThrownBy(() -> s3AsyncClient.putObject(request, body).join()) + .isInstanceOf(CompletionException.class) + .hasCauseInstanceOf(S3Exception.class) + .hasMessageContaining("Service: S3, Status Code: 412") + .hasMessageContaining("At least one of the pre-conditions you specified did not hold"); + } + } + + String responseBody = + s3AsyncClient + .getObject( + request -> request.bucket(bucket).key(key), AsyncResponseTransformer.toBytes()) + .join() + .asUtf8String(); + assertThat(responseBody).isEqualTo("test-payload-0"); + } } diff --git a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIO.java b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIO.java index ebee07e53e4c..0818a3fa7206 100644 --- a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIO.java +++ b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIO.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.aws.s3; +import static org.apache.iceberg.aws.s3.S3TestUtil.mergeProperties; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.AdditionalAnswers.delegatesTo; @@ -119,7 +120,10 @@ public class TestS3FileIO { private final SerializableSupplier s3 = () -> MinioUtil.createS3Client(minio, legacyMd5PluginEnabled()); + private final SerializableSupplier s3Async = + () -> MinioUtil.createS3AsyncClient(minio); private final S3Client s3mock = mock(S3Client.class, delegatesTo(s3.get())); + private final S3AsyncClient s3Asyncmock = mock(S3AsyncClient.class, delegatesTo(s3Async.get())); private final Random random = new Random(1); private final int numBucketsForBatchDeletion = 3; private final String batchDeletionBucketPrefix = "batch-delete-"; @@ -148,13 +152,14 @@ protected boolean legacyMd5PluginEnabled() { @BeforeEach public void before() { - s3FileIO = new S3FileIO(() -> s3mock); + s3FileIO = new S3FileIO(() -> s3mock, () -> s3Asyncmock); s3FileIO.initialize(properties); createBucket(S3_GENERAL_PURPOSE_BUCKET); for (int i = 1; i <= numBucketsForBatchDeletion; i++) { createBucket(batchDeletionBucketPrefix + i); } StaticClientFactory.client = s3mock; + StaticClientFactory.asyncClient = s3Asyncmock; } @AfterEach @@ -164,16 +169,19 @@ public void after() { } } - @Test - public void testNewInputFile() throws IOException { + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testNewInputFile(Map aalProperties) throws IOException { + final S3FileIO testS3FileIO = new S3FileIO(() -> s3mock, () -> s3Asyncmock); + testS3FileIO.initialize(mergeProperties(aalProperties, properties)); String location = "s3://bucket/path/to/file.txt"; byte[] expected = new byte[1024 * 1024]; random.nextBytes(expected); - InputFile in = s3FileIO.newInputFile(location); + InputFile in = testS3FileIO.newInputFile(location); assertThat(in.exists()).isFalse(); - OutputFile out = s3FileIO.newOutputFile(location); + OutputFile out = testS3FileIO.newOutputFile(location); try (OutputStream os = out.createOrOverwrite()) { IOUtil.writeFully(os, ByteBuffer.wrap(expected)); } @@ -187,9 +195,10 @@ public void testNewInputFile() throws IOException { assertThat(actual).isEqualTo(expected); - s3FileIO.deleteFile(in); + testS3FileIO.deleteFile(in); - assertThat(s3FileIO.newInputFile(location).exists()).isFalse(); + assertThat(testS3FileIO.newInputFile(location).exists()).isFalse(); + testS3FileIO.close(); } @Test @@ -275,8 +284,11 @@ public void testSerializeClient() throws IOException, ClassNotFoundException { assertThat(post.get().serviceName()).isEqualTo("s3"); } - @Test - public void testPrefixList() { + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testPrefixList(Map aalProperties) { + final S3FileIO testS3FileIO = new S3FileIO(() -> s3mock, () -> s3Asyncmock); + testS3FileIO.initialize(mergeProperties(aalProperties, properties)); String prefix = "s3://bucket/path/to/list"; List scaleSizes = Lists.newArrayList(1, 1000, 2500); @@ -287,12 +299,14 @@ public void testPrefixList() { String scalePrefix = String.format("%s/%s/", prefix, scale); createRandomObjects(scalePrefix, scale); - assertThat(Streams.stream(s3FileIO.listPrefix(scalePrefix)).count()) + assertThat(Streams.stream(testS3FileIO.listPrefix(scalePrefix)).count()) .isEqualTo((long) scale); }); long totalFiles = scaleSizes.stream().mapToLong(Integer::longValue).sum(); - assertThat(Streams.stream(s3FileIO.listPrefix(prefix)).count()).isEqualTo(totalFiles); + assertThat(Streams.stream(testS3FileIO.listPrefix(prefix)).count()).isEqualTo(totalFiles); + testS3FileIO.deletePrefix(prefix); + testS3FileIO.close(); } /** @@ -396,18 +410,27 @@ public void testPrefixDelete() { }); } - @Test - public void testReadMissingLocation() { + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testReadMissingLocation(Map aalProperties) { + final Map testProperties = mergeProperties(aalProperties, this.properties); + final S3FileIO testS3FileIO = new S3FileIO(() -> s3mock, () -> s3Asyncmock); + testS3FileIO.initialize(testProperties); String location = "s3://bucket/path/to/data.parquet"; - InputFile in = s3FileIO.newInputFile(location); + InputFile in = testS3FileIO.newInputFile(location); assertThatThrownBy(() -> in.newStream().read()) .isInstanceOf(NotFoundException.class) .hasMessage("Location does not exist: " + location); + testS3FileIO.close(); } - @Test - public void testMissingTableMetadata() { + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testMissingTableMetadata(Map aalProperties) { + final Map testProperties = mergeProperties(aalProperties, this.properties); + final S3FileIO testS3FileIO = new S3FileIO(() -> s3mock, () -> s3Asyncmock); + testS3FileIO.initialize(testProperties); Map conf = Maps.newHashMap(); conf.put( CatalogProperties.URI, @@ -426,7 +449,7 @@ public void testMissingTableMetadata() { BaseTable table = (BaseTable) catalog.createTable(ident, schema); // delete the current metadata - s3FileIO.deleteFile(table.operations().current().metadataFileLocation()); + testS3FileIO.deleteFile(table.operations().current().metadataFileLocation()); long start = System.currentTimeMillis(); // to test NotFoundException, load the table again. refreshing the existing table doesn't @@ -438,6 +461,7 @@ public void testMissingTableMetadata() { long duration = System.currentTimeMillis() - start; assertThat(duration < 10_000).as("Should take less than 10 seconds").isTrue(); } + testS3FileIO.close(); } @Test @@ -572,8 +596,11 @@ public void testResolvingFileIOLoadWithoutConf() { assertThat(result).isInstanceOf(S3FileIO.class); } - @Test - public void testInputFileWithDataFile() throws IOException { + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testInputFileWithDataFile(Map aalProperties) throws IOException { + final S3FileIO testS3FileIO = new S3FileIO(() -> s3mock, () -> s3Asyncmock); + testS3FileIO.initialize(mergeProperties(aalProperties, properties)); String location = "s3://bucket/path/to/data-file.parquet"; DataFile dataFile = DataFiles.builder(PartitionSpec.unpartitioned()) @@ -582,22 +609,27 @@ public void testInputFileWithDataFile() throws IOException { .withFormat(FileFormat.PARQUET) .withRecordCount(123L) .build(); - OutputStream outputStream = s3FileIO.newOutputFile(location).create(); + OutputStream outputStream = testS3FileIO.newOutputFile(location).create(); byte[] data = "testing".getBytes(); outputStream.write(data); outputStream.close(); - InputFile inputFile = s3FileIO.newInputFile(dataFile); + InputFile inputFile = testS3FileIO.newInputFile(dataFile); reset(s3mock); assertThat(inputFile.getLength()) .as("Data file length should be determined from the file size stats") .isEqualTo(123L); verify(s3mock, never()).headObject(any(HeadObjectRequest.class)); + testS3FileIO.deleteFile(location); + testS3FileIO.close(); } - @Test - public void testInputFileWithManifest() throws IOException { + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testInputFileWithManifest(Map aalProperties) throws IOException { + final S3FileIO testS3FileIO = new S3FileIO(() -> s3mock, () -> s3Asyncmock); + testS3FileIO.initialize(mergeProperties(aalProperties, properties)); String dataFileLocation = "s3://bucket/path/to/data-file-2.parquet"; DataFile dataFile = DataFiles.builder(PartitionSpec.unpartitioned()) @@ -607,17 +639,19 @@ public void testInputFileWithManifest() throws IOException { .withRecordCount(123L) .build(); String manifestLocation = "s3://bucket/path/to/manifest.avro"; - OutputFile outputFile = s3FileIO.newOutputFile(manifestLocation); + OutputFile outputFile = testS3FileIO.newOutputFile(manifestLocation); ManifestWriter writer = ManifestFiles.write(PartitionSpec.unpartitioned(), outputFile); writer.add(dataFile); writer.close(); ManifestFile manifest = writer.toManifestFile(); - InputFile inputFile = s3FileIO.newInputFile(manifest); + InputFile inputFile = testS3FileIO.newInputFile(manifest); reset(s3mock); assertThat(inputFile.getLength()).isEqualTo(manifest.length()); verify(s3mock, never()).headObject(any(HeadObjectRequest.class)); + testS3FileIO.deleteFile(dataFileLocation); + testS3FileIO.close(); } @ParameterizedTest diff --git a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIOIntegration.java b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIOIntegration.java index 5fe410582ce5..ae93e8a9a6d2 100644 --- a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIOIntegration.java +++ b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIOIntegration.java @@ -18,6 +18,8 @@ */ package org.apache.iceberg.aws.s3; +import static org.apache.iceberg.aws.s3.S3TestUtil.mergeProperties; +import static org.apache.iceberg.aws.s3.S3TestUtil.skipIfAnalyticsAcceleratorEnabled; import static org.assertj.core.api.Assertions.assertThat; import java.io.ByteArrayInputStream; @@ -49,6 +51,8 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable; import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariables; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.regions.PartitionMetadata; import software.amazon.awssdk.regions.Region; @@ -159,18 +163,21 @@ public void beforeEach() { clientFactory.initialize(Maps.newHashMap()); } - @Test - public void testNewInputStream() throws Exception { + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testNewInputStream(Map aalProperties) throws Exception { s3.putObject( PutObjectRequest.builder().bucket(bucketName).key(objectKey).build(), RequestBody.fromBytes(contentBytes)); S3FileIO s3FileIO = new S3FileIO(clientFactory::s3); - s3FileIO.initialize(ImmutableMap.of()); + s3FileIO.initialize(aalProperties); validateRead(s3FileIO); } - @Test - public void testS3FileIOWithS3FileIOAwsClientFactoryImpl() throws Exception { + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testS3FileIOWithS3FileIOAwsClientFactoryImpl(Map aalProperties) + throws Exception { s3.putObject( PutObjectRequest.builder().bucket(bucketName).key(objectKey).build(), RequestBody.fromBytes(contentBytes)); @@ -179,38 +186,49 @@ public void testS3FileIOWithS3FileIOAwsClientFactoryImpl() throws Exception { properties.put( S3FileIOProperties.CLIENT_FACTORY, "org.apache.iceberg.aws.s3.DefaultS3FileIOAwsClientFactory"); - s3FileIO.initialize(properties); + s3FileIO.initialize(mergeProperties(aalProperties, properties)); validateRead(s3FileIO); } - @Test - public void testS3FileIOWithDefaultAwsClientFactoryImpl() throws Exception { + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testS3FileIOWithDefaultAwsClientFactoryImpl(Map aalProperties) + throws Exception { s3.putObject( PutObjectRequest.builder().bucket(bucketName).key(objectKey).build(), RequestBody.fromBytes(contentBytes)); S3FileIO s3FileIO = new S3FileIO(); - s3FileIO.initialize(Maps.newHashMap()); + s3FileIO.initialize(aalProperties); validateRead(s3FileIO); } - @Test - public void testNewInputStreamWithAccessPoint() throws Exception { + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testNewInputStreamWithAccessPoint(Map aalProperties) + throws Exception { requireAccessPointSupport(); s3.putObject( PutObjectRequest.builder().bucket(bucketName).key(objectKey).build(), RequestBody.fromBytes(contentBytes)); S3FileIO s3FileIO = new S3FileIO(clientFactory::s3); - s3FileIO.initialize( + Map testProperties = ImmutableMap.of( S3FileIOProperties.ACCESS_POINTS_PREFIX + bucketName, - testAccessPointARN(AwsIntegTestUtil.testRegion(), accessPointName))); + testAccessPointARN(AwsIntegTestUtil.testRegion(), accessPointName)); + s3FileIO.initialize(mergeProperties(aalProperties, testProperties)); validateRead(s3FileIO); } - @Test - public void testCrossRegionAccessEnabled() throws Exception { - clientFactory.initialize( - ImmutableMap.of(S3FileIOProperties.CROSS_REGION_ACCESS_ENABLED, "true")); + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testCrossRegionAccessEnabled(Map aalProperties) throws Exception { + Map testProperties = + ImmutableMap.of(S3FileIOProperties.CROSS_REGION_ACCESS_ENABLED, "true"); + Map properties = mergeProperties(aalProperties, testProperties); + skipIfAnalyticsAcceleratorEnabled( + new S3FileIOProperties(properties), + "S3 Async Clients needed for Analytics Accelerator Library does not support Cross Region Access"); + clientFactory.initialize(properties); S3Client s3Client = clientFactory.s3(); String crossBucketObjectKey = String.format("%s/%s", prefix, UUID.randomUUID()); String crossBucketObjectUri = @@ -224,6 +242,7 @@ public void testCrossRegionAccessEnabled() throws Exception { RequestBody.fromBytes(contentBytes)); // make a copy in cross-region bucket S3FileIO s3FileIO = new S3FileIO(clientFactory::s3); + s3FileIO.initialize(aalProperties); validateRead(s3FileIO, crossBucketObjectUri); } finally { AwsIntegTestUtil.cleanS3GeneralPurposeBucket( @@ -231,10 +250,18 @@ public void testCrossRegionAccessEnabled() throws Exception { } } - @Test - public void testNewInputStreamWithCrossRegionAccessPoint() throws Exception { + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testNewInputStreamWithCrossRegionAccessPoint(Map aalProperties) + throws Exception { requireAccessPointSupport(); - clientFactory.initialize(ImmutableMap.of(S3FileIOProperties.USE_ARN_REGION_ENABLED, "true")); + Map testProperties = + ImmutableMap.of(S3FileIOProperties.USE_ARN_REGION_ENABLED, "true"); + Map properties = mergeProperties(aalProperties, testProperties); + skipIfAnalyticsAcceleratorEnabled( + new S3FileIOProperties(properties), + "S3 Async Clients needed for Analytics Accelerator Library does not support Cross Region Access Points"); + clientFactory.initialize(properties); S3Client s3Client = clientFactory.s3(); s3Client.putObject( PutObjectRequest.builder().bucket(bucketName).key(objectKey).build(), @@ -248,38 +275,33 @@ public void testNewInputStreamWithCrossRegionAccessPoint() throws Exception { .build(), RequestBody.fromBytes(contentBytes)); S3FileIO s3FileIO = new S3FileIO(clientFactory::s3); - s3FileIO.initialize( + Map accessPointProperties = ImmutableMap.of( S3FileIOProperties.ACCESS_POINTS_PREFIX + bucketName, - testAccessPointARN(AwsIntegTestUtil.testCrossRegion(), crossRegionAccessPointName))); + testAccessPointARN(AwsIntegTestUtil.testCrossRegion(), crossRegionAccessPointName)); + s3FileIO.initialize(mergeProperties(aalProperties, accessPointProperties)); validateRead(s3FileIO); } - @Test - public void testNewInputStreamWithMultiRegionAccessPoint() throws Exception { + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testNewInputStreamWithMultiRegionAccessPoint(Map aalProperties) + throws Exception { Assumptions.assumeThat(multiRegionAccessPointAlias).isNotEmpty(); - clientFactory.initialize(ImmutableMap.of(S3FileIOProperties.USE_ARN_REGION_ENABLED, "true")); + Map testProperties = + ImmutableMap.of(S3FileIOProperties.USE_ARN_REGION_ENABLED, "true"); + clientFactory.initialize(mergeProperties(aalProperties, testProperties)); S3Client s3Client = clientFactory.s3(); s3Client.putObject( PutObjectRequest.builder().bucket(bucketName).key(objectKey).build(), RequestBody.fromBytes(contentBytes)); S3FileIO s3FileIO = new S3FileIO(clientFactory::s3); - s3FileIO.initialize( + Map accessPointProperties = ImmutableMap.of( S3FileIOProperties.ACCESS_POINTS_PREFIX + bucketName, testMultiRegionAccessPointARN( - AwsIntegTestUtil.testRegion(), multiRegionAccessPointAlias))); - validateRead(s3FileIO); - } - - @Test - public void testNewInputStreamWithAnalyticsAccelerator() throws Exception { - s3.putObject( - PutObjectRequest.builder().bucket(bucketName).key(objectKey).build(), - RequestBody.fromBytes(contentBytes)); - S3FileIO s3FileIO = new S3FileIO(); - s3FileIO.initialize( - ImmutableMap.of(S3FileIOProperties.S3_ANALYTICS_ACCELERATOR_ENABLED, String.valueOf(true))); + AwsIntegTestUtil.testRegion(), multiRegionAccessPointAlias)); + s3FileIO.initialize(mergeProperties(aalProperties, accessPointProperties)); validateRead(s3FileIO); } @@ -397,11 +419,13 @@ public void testNewOutputStreamWithAnalyticsAccelerator() throws Exception { } } - @Test - public void testServerSideS3Encryption() throws Exception { + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testServerSideS3Encryption(Map aalProperties) throws Exception { S3FileIO s3FileIO = new S3FileIO(clientFactory::s3); - s3FileIO.initialize( - ImmutableMap.of(S3FileIOProperties.SSE_TYPE, S3FileIOProperties.SSE_TYPE_S3)); + Map testProperties = + ImmutableMap.of(S3FileIOProperties.SSE_TYPE, S3FileIOProperties.SSE_TYPE_S3); + s3FileIO.initialize(mergeProperties(aalProperties, testProperties)); write(s3FileIO); validateRead(s3FileIO); GetObjectResponse response = @@ -410,16 +434,18 @@ public void testServerSideS3Encryption() throws Exception { assertThat(response.serverSideEncryption()).isEqualTo(ServerSideEncryption.AES256); } - @Test - public void testServerSideKmsEncryption() throws Exception { + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testServerSideKmsEncryption(Map aalProperties) throws Exception { requireKMSEncryptionSupport(); S3FileIO s3FileIO = new S3FileIO(clientFactory::s3); - s3FileIO.initialize( + Map testProperties = ImmutableMap.of( S3FileIOProperties.SSE_TYPE, S3FileIOProperties.SSE_TYPE_KMS, S3FileIOProperties.SSE_KEY, - kmsKeyArn)); + kmsKeyArn); + s3FileIO.initialize(mergeProperties(aalProperties, testProperties)); write(s3FileIO); validateRead(s3FileIO); GetObjectResponse response = @@ -429,12 +455,15 @@ public void testServerSideKmsEncryption() throws Exception { assertThat(kmsKeyArn).isEqualTo(response.ssekmsKeyId()); } - @Test - public void testServerSideKmsEncryptionWithDefaultKey() throws Exception { + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testServerSideKmsEncryptionWithDefaultKey(Map aalProperties) + throws Exception { requireKMSEncryptionSupport(); S3FileIO s3FileIO = new S3FileIO(clientFactory::s3); - s3FileIO.initialize( - ImmutableMap.of(S3FileIOProperties.SSE_TYPE, S3FileIOProperties.SSE_TYPE_KMS)); + Map testProperties = + ImmutableMap.of(S3FileIOProperties.SSE_TYPE, S3FileIOProperties.SSE_TYPE_KMS); + s3FileIO.initialize(mergeProperties(aalProperties, testProperties)); write(s3FileIO); validateRead(s3FileIO); GetObjectResponse response = @@ -451,16 +480,19 @@ public void testServerSideKmsEncryptionWithDefaultKey() throws Exception { aliasListEntry -> assertThat(aliasListEntry.aliasName()).isEqualTo("alias/aws/s3")); } - @Test - public void testDualLayerServerSideKmsEncryption() throws Exception { + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testDualLayerServerSideKmsEncryption(Map aalProperties) + throws Exception { requireKMSEncryptionSupport(); S3FileIO s3FileIO = new S3FileIO(clientFactory::s3); - s3FileIO.initialize( + Map testProperties = ImmutableMap.of( S3FileIOProperties.SSE_TYPE, S3FileIOProperties.DSSE_TYPE_KMS, S3FileIOProperties.SSE_KEY, - kmsKeyArn)); + kmsKeyArn); + s3FileIO.initialize(mergeProperties(aalProperties, testProperties)); write(s3FileIO); validateRead(s3FileIO); GetObjectResponse response = @@ -470,8 +502,9 @@ public void testDualLayerServerSideKmsEncryption() throws Exception { assertThat(response.ssekmsKeyId()).isEqualTo(kmsKeyArn); } - @Test - public void testServerSideCustomEncryption() throws Exception { + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testServerSideCustomEncryption(Map aalProperties) throws Exception { requireKMSEncryptionSupport(); // generate key KeyGenerator keyGenerator = KeyGenerator.getInstance("AES"); @@ -485,14 +518,16 @@ public void testServerSideCustomEncryption() throws Exception { new String(encoder.encode(digest.digest(secretKey.getEncoded())), StandardCharsets.UTF_8); S3FileIO s3FileIO = new S3FileIO(clientFactory::s3); - s3FileIO.initialize( + Map testProperties = ImmutableMap.of( - S3FileIOProperties.SSE_TYPE, - S3FileIOProperties.SSE_TYPE_CUSTOM, - S3FileIOProperties.SSE_KEY, - encodedKey, - S3FileIOProperties.SSE_MD5, - md5)); + S3FileIOProperties.SSE_TYPE, S3FileIOProperties.SSE_TYPE_CUSTOM, + S3FileIOProperties.SSE_KEY, encodedKey, + S3FileIOProperties.SSE_MD5, md5); + Map properties = mergeProperties(aalProperties, testProperties); + skipIfAnalyticsAcceleratorEnabled( + new S3FileIOProperties(properties), + "Analytics Accelerator Library does not support Server Side Custom Encryption"); + s3FileIO.initialize(properties); write(s3FileIO); validateRead(s3FileIO); GetObjectResponse response = @@ -510,13 +545,15 @@ public void testServerSideCustomEncryption() throws Exception { assertThat(response.sseCustomerKeyMD5()).isEqualTo(md5); } - @Test - public void testACL() throws Exception { + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testACL(Map aalProperties) throws Exception { requireACLSupport(); S3FileIO s3FileIO = new S3FileIO(clientFactory::s3); - s3FileIO.initialize( + Map testProperties = ImmutableMap.of( - S3FileIOProperties.ACL, ObjectCannedACL.BUCKET_OWNER_FULL_CONTROL.toString())); + S3FileIOProperties.ACL, ObjectCannedACL.BUCKET_OWNER_FULL_CONTROL.toString()); + s3FileIO.initialize(mergeProperties(aalProperties, testProperties)); write(s3FileIO); validateRead(s3FileIO); @@ -529,65 +566,81 @@ public void testACL() throws Exception { .satisfies(grant -> assertThat(grant.permission()).isEqualTo(Permission.FULL_CONTROL)); } - @Test - public void testClientFactorySerialization() throws Exception { + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testClientFactorySerialization(Map aalProperties) throws Exception { S3FileIO fileIO = new S3FileIO(clientFactory::s3); - fileIO.initialize(ImmutableMap.of()); + fileIO.initialize(aalProperties); write(fileIO); byte[] data = TestHelpers.serialize(fileIO); S3FileIO fileIO2 = TestHelpers.deserialize(data); validateRead(fileIO2); } - @Test - public void testDeleteFilesMultipleBatches() throws Exception { + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testDeleteFilesMultipleBatches(Map aalProperties) throws Exception { S3FileIO s3FileIO = new S3FileIO(clientFactory::s3); - s3FileIO.initialize( - ImmutableMap.of(S3FileIOProperties.DELETE_BATCH_SIZE, Integer.toString(deletionBatchSize))); + Map testProperties = + ImmutableMap.of(S3FileIOProperties.DELETE_BATCH_SIZE, Integer.toString(deletionBatchSize)); + s3FileIO.initialize(mergeProperties(aalProperties, testProperties)); testDeleteFiles(deletionBatchSize * 2, s3FileIO); } - @Test - public void testDeleteFilesMultipleBatchesWithAccessPoints() throws Exception { + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testDeleteFilesMultipleBatchesWithAccessPoints(Map aalProperties) + throws Exception { requireAccessPointSupport(); S3FileIO s3FileIO = new S3FileIO(clientFactory::s3); - s3FileIO.initialize( + Map testProperties = ImmutableMap.of( S3FileIOProperties.DELETE_BATCH_SIZE, Integer.toString(deletionBatchSize), S3FileIOProperties.ACCESS_POINTS_PREFIX + bucketName, - testAccessPointARN(AwsIntegTestUtil.testRegion(), accessPointName))); + testAccessPointARN(AwsIntegTestUtil.testRegion(), accessPointName)); + s3FileIO.initialize(mergeProperties(aalProperties, testProperties)); testDeleteFiles(deletionBatchSize * 2, s3FileIO); } - @Test - public void testDeleteFilesMultipleBatchesWithCrossRegionAccessPoints() throws Exception { + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testDeleteFilesMultipleBatchesWithCrossRegionAccessPoints( + Map aalProperties) throws Exception { requireKMSEncryptionSupport(); - clientFactory.initialize(ImmutableMap.of(S3FileIOProperties.USE_ARN_REGION_ENABLED, "true")); + Map testProperties = + ImmutableMap.of(S3FileIOProperties.USE_ARN_REGION_ENABLED, "true"); + clientFactory.initialize(mergeProperties(aalProperties, testProperties)); S3FileIO s3FileIO = new S3FileIO(clientFactory::s3); - s3FileIO.initialize( + Map accessPointProperties = ImmutableMap.of( S3FileIOProperties.DELETE_BATCH_SIZE, Integer.toString(deletionBatchSize), S3FileIOProperties.ACCESS_POINTS_PREFIX + bucketName, - testAccessPointARN(AwsIntegTestUtil.testCrossRegion(), crossRegionAccessPointName))); + testAccessPointARN(AwsIntegTestUtil.testCrossRegion(), crossRegionAccessPointName)); + s3FileIO.initialize(mergeProperties(aalProperties, accessPointProperties)); testDeleteFiles(deletionBatchSize * 2, s3FileIO); } - @Test - public void testDeleteFilesLessThanBatchSize() throws Exception { + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testDeleteFilesLessThanBatchSize(Map aalProperties) throws Exception { S3FileIO s3FileIO = new S3FileIO(clientFactory::s3); - s3FileIO.initialize( - ImmutableMap.of(S3FileIOProperties.DELETE_BATCH_SIZE, Integer.toString(deletionBatchSize))); + Map testProperties = + ImmutableMap.of(S3FileIOProperties.DELETE_BATCH_SIZE, Integer.toString(deletionBatchSize)); + s3FileIO.initialize(mergeProperties(aalProperties, testProperties)); testDeleteFiles(deletionBatchSize - 1, s3FileIO); } - @Test - public void testDeleteFilesSingleBatchWithRemainder() throws Exception { + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testDeleteFilesSingleBatchWithRemainder(Map aalProperties) + throws Exception { S3FileIO s3FileIO = new S3FileIO(clientFactory::s3); - s3FileIO.initialize( - ImmutableMap.of(S3FileIOProperties.DELETE_BATCH_SIZE, Integer.toString(deletionBatchSize))); + Map testProperties = + ImmutableMap.of(S3FileIOProperties.DELETE_BATCH_SIZE, Integer.toString(deletionBatchSize)); + s3FileIO.initialize(mergeProperties(aalProperties, testProperties)); testDeleteFiles(5, s3FileIO); } @@ -629,11 +682,12 @@ public void testPrefixDelete() { }); } - @Test - public void testFileRecoveryHappyPath() throws Exception { + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testFileRecoveryHappyPath(Map aalProperties) throws Exception { requireVersioningSupport(); S3FileIO s3FileIO = new S3FileIO(clientFactory::s3); - s3FileIO.initialize(ImmutableMap.of()); + s3FileIO.initialize(aalProperties); String filePath = String.format("s3://%s/%s/%s", bucketName, prefix, "someFile.parquet"); write(s3FileIO, filePath); s3FileIO.deleteFile(filePath); @@ -641,13 +695,15 @@ public void testFileRecoveryHappyPath() throws Exception { assertThat(s3FileIO.recoverFile(filePath)).isTrue(); assertThat(s3FileIO.newInputFile(filePath).exists()).isTrue(); + s3FileIO.deleteFile(filePath); } - @Test - public void testFileRecoveryFailsToRecover() throws Exception { + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testFileRecoveryFailsToRecover(Map aalProperties) throws Exception { requireVersioningSupport(); S3FileIO s3FileIO = new S3FileIO(clientFactory::s3); - s3FileIO.initialize(ImmutableMap.of()); + s3FileIO.initialize(aalProperties); s3.putBucketVersioning( PutBucketVersioningRequest.builder() .bucket(bucketName) diff --git a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3InputStream.java b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3InputStream.java index f8903842df37..50d1e47f07fb 100644 --- a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3InputStream.java +++ b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3InputStream.java @@ -18,21 +18,33 @@ */ package org.apache.iceberg.aws.s3; +import static org.apache.iceberg.aws.s3.S3TestUtil.skipIfAnalyticsAcceleratorEnabled; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assumptions.assumeThat; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.List; +import java.util.Map; import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.function.IntFunction; import org.apache.iceberg.io.IOUtil; +import org.apache.iceberg.io.ParquetObjectRange; import org.apache.iceberg.io.RangeReadable; import org.apache.iceberg.io.SeekableInputStream; +import org.apache.iceberg.metrics.MetricsContext; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; import org.testcontainers.containers.MinIOContainer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.BucketAlreadyExistsException; import software.amazon.awssdk.services.s3.model.BucketAlreadyOwnedByYouException; @@ -44,6 +56,7 @@ public class TestS3InputStream { @Container private static final MinIOContainer MINIO = MinioUtil.createContainer(); private final S3Client s3 = MinioUtil.createS3Client(MINIO); + private final S3AsyncClient s3Async = MinioUtil.createS3AsyncClient(MINIO); private final Random random = new Random(1); @BeforeEach @@ -51,23 +64,37 @@ public void before() { createBucket("bucket"); } - @Test - public void testRead() throws Exception { - testRead(s3); + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testRead(Map aalProperties) throws Exception { + testRead(s3, s3Async, aalProperties); } - S3InputStream newInputStream(S3Client s3Client, S3URI uri) { + SeekableInputStream newInputStream( + S3Client s3Client, + S3AsyncClient s3AsyncClient, + S3URI uri, + Map aalProperties) { + final S3FileIOProperties s3FileIOProperties = new S3FileIOProperties(aalProperties); + if (s3FileIOProperties.isS3AnalyticsAcceleratorEnabled()) { + PrefixedS3Client client = + new PrefixedS3Client("s3", aalProperties, () -> s3Client, () -> s3AsyncClient); + return AnalyticsAcceleratorUtil.newStream( + S3InputFile.fromLocation(uri.location(), client, MetricsContext.nullMetrics())); + } return new S3InputStream(s3Client, uri); } - protected void testRead(S3Client s3Client) throws Exception { + protected void testRead( + S3Client s3Client, S3AsyncClient s3AsyncClient, Map aalProperties) + throws Exception { S3URI uri = new S3URI("s3://bucket/path/to/read.dat"); int dataSize = 1024 * 1024 * 10; byte[] data = randomData(dataSize); writeS3Data(uri, data); - try (SeekableInputStream in = newInputStream(s3Client, uri)) { + try (SeekableInputStream in = newInputStream(s3Client, s3AsyncClient, uri, aalProperties)) { int readSize = 1024; readAndCheck(in, in.getPos(), readSize, data, false); readAndCheck(in, in.getPos(), readSize, data, true); @@ -114,12 +141,18 @@ private void readAndCheck( assertThat(actual).isEqualTo(Arrays.copyOfRange(original, (int) rangeStart, (int) rangeEnd)); } - @Test - public void testRangeRead() throws Exception { - testRangeRead(s3); + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testRangeRead(Map aalProperties) throws Exception { + final S3FileIOProperties s3FileIOProperties = new S3FileIOProperties(aalProperties); + skipIfAnalyticsAcceleratorEnabled( + s3FileIOProperties, "Analytics Accelerator Library does not support range reads"); + testRangeRead(s3, s3Async, aalProperties); } - protected void testRangeRead(S3Client s3Client) throws Exception { + protected void testRangeRead( + S3Client s3Client, S3AsyncClient s3AsyncClient, Map aalProperties) + throws Exception { S3URI uri = new S3URI("s3://bucket/path/to/range-read.dat"); int dataSize = 1024 * 1024 * 10; byte[] expected = randomData(dataSize); @@ -131,7 +164,8 @@ protected void testRangeRead(S3Client s3Client) throws Exception { writeS3Data(uri, expected); - try (RangeReadable in = newInputStream(s3Client, uri)) { + try (RangeReadable in = + (RangeReadable) newInputStream(s3Client, s3AsyncClient, uri, aalProperties)) { // first 1k position = 0; offset = 0; @@ -160,28 +194,35 @@ private void readAndCheckRanges( .isEqualTo(Arrays.copyOfRange(original, offset, offset + length)); } - @Test - public void testClose() throws Exception { + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testClose(Map aalProperties) throws Exception { S3URI uri = new S3URI("s3://bucket/path/to/closed.dat"); - SeekableInputStream closed = newInputStream(s3, uri); + int dataSize = 1024 * 1024 * 10; + byte[] data = randomData(dataSize); + writeS3Data(uri, data); + SeekableInputStream closed = newInputStream(s3, s3Async, uri, aalProperties); closed.close(); assertThatThrownBy(() -> closed.seek(0)) .isInstanceOf(IllegalStateException.class) .hasMessage("already closed"); } - @Test - public void testSeek() throws Exception { - testSeek(s3); + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testSeek(Map aalProperties) throws Exception { + testSeek(s3, s3Async, aalProperties); } - protected void testSeek(S3Client s3Client) throws Exception { + protected void testSeek( + S3Client s3Client, S3AsyncClient s3AsyncClient, Map aalProperties) + throws Exception { S3URI uri = new S3URI("s3://bucket/path/to/seek.dat"); byte[] expected = randomData(1024 * 1024); writeS3Data(uri, expected); - try (SeekableInputStream in = newInputStream(s3Client, uri)) { + try (SeekableInputStream in = newInputStream(s3Client, s3AsyncClient, uri, aalProperties)) { in.seek(expected.length / 2); byte[] actual = new byte[expected.length / 2]; IOUtil.readFully(in, actual, 0, expected.length / 2); @@ -190,6 +231,143 @@ protected void testSeek(S3Client s3Client) throws Exception { } } + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testVectoredRead(Map aalProperties) throws Exception { + testVectoredRead(s3, s3Async, aalProperties); + } + + protected void testVectoredRead( + S3Client s3Client, S3AsyncClient s3AsyncClient, Map aalProperties) + throws Exception { + final S3FileIOProperties s3FileIOProperties = new S3FileIOProperties(aalProperties); + assumeThat(s3FileIOProperties.isS3AnalyticsAcceleratorEnabled()).isTrue(); + + S3URI uri = new S3URI("s3://bucket/path/to/vectored-read.dat"); + int dataSize = 1024 * 1024 * 10; + byte[] data = randomData(dataSize); + + writeS3Data(uri, data); + + try (SeekableInputStream in = newInputStream(s3Client, s3AsyncClient, uri, aalProperties)) { + IntFunction allocate = ByteBuffer::allocate; + assertThat(in.readVectoredAvailable(allocate)).isTrue(); + + List ranges = Lists.newArrayList(); + CompletableFuture future1 = new CompletableFuture<>(); + CompletableFuture future2 = new CompletableFuture<>(); + CompletableFuture future3 = new CompletableFuture<>(); + + // First range: first 1024 bytes + int range1Offset = 0; + int range1Length = 1024; + ranges.add(new ParquetObjectRange(future1, range1Offset, range1Length)); + + // Second range: middle 2048 bytes + int range2Offset = dataSize / 2; + int range2Length = 2048; + ranges.add(new ParquetObjectRange(future2, range2Offset, range2Length)); + + // Third range: last 1024 bytes + int range3Offset = dataSize - 1024; + int range3Length = 1024; + ranges.add(new ParquetObjectRange(future3, range3Offset, range3Length)); + + in.readVectored(ranges, allocate); + + ByteBuffer buffer1 = future1.get(); + ByteBuffer buffer2 = future2.get(); + ByteBuffer buffer3 = future3.get(); + + assertThat(future1.isDone()).isTrue(); + assertThat(future2.isDone()).isTrue(); + assertThat(future3.isDone()).isTrue(); + + assertThat(buffer1.limit()).isEqualTo(range1Length); + assertThat(buffer2.limit()).isEqualTo(range2Length); + assertThat(buffer3.limit()).isEqualTo(range3Length); + + byte[] range1Data = new byte[range1Length]; + byte[] range2Data = new byte[range2Length]; + byte[] range3Data = new byte[range3Length]; + + buffer1.get(range1Data); + buffer2.get(range2Data); + buffer3.get(range3Data); + + assertThat(range1Data) + .isEqualTo(Arrays.copyOfRange(data, range1Offset, range1Offset + range1Length)); + assertThat(range2Data) + .isEqualTo(Arrays.copyOfRange(data, range2Offset, range2Offset + range2Length)); + assertThat(range3Data) + .isEqualTo(Arrays.copyOfRange(data, range3Offset, range3Offset + range3Length)); + } + } + + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testVectoredReadWithNonNonContinuousRanges(Map aalProperties) + throws Exception { + testVectoredReadWithNonContinuousRanges(s3, s3Async, aalProperties); + } + + protected void testVectoredReadWithNonContinuousRanges( + S3Client s3Client, S3AsyncClient s3AsyncClient, Map aalProperties) + throws Exception { + + final S3FileIOProperties s3FileIOProperties = new S3FileIOProperties(aalProperties); + assumeThat(s3FileIOProperties.isS3AnalyticsAcceleratorEnabled()).isTrue(); + + S3URI uri = new S3URI("s3://bucket/path/to/vectored-read-overlapping.dat"); + int dataSize = 1024 * 1024; + byte[] data = randomData(dataSize); + + writeS3Data(uri, data); + + try (SeekableInputStream in = newInputStream(s3Client, s3AsyncClient, uri, aalProperties)) { + List ranges = Lists.newArrayList(); + CompletableFuture future1 = new CompletableFuture<>(); + CompletableFuture future2 = new CompletableFuture<>(); + + // First range: 0-1024 + int range1Offset = 0; + int range1Length = 1024; + ranges.add(new ParquetObjectRange(future1, range1Offset, range1Length)); + + // Second range: 2000-3400 + int range2Offset = 2000; + int range2Length = 3400; + ranges.add(new ParquetObjectRange(future2, range2Offset, range2Length)); + + // Call readVectored + IntFunction allocate = ByteBuffer::allocate; + in.readVectored(ranges, allocate); + + // Verify the buffers have the expected content + ByteBuffer buffer1 = future1.get(); + ByteBuffer buffer2 = future2.get(); + + // Verify the futures were completed + assertThat(future1.isDone()).isTrue(); + assertThat(future2.isDone()).isTrue(); + + assertThat(buffer1.limit()).isEqualTo(range1Length); + assertThat(buffer2.limit()).isEqualTo(range2Length); + + // Verify the buffer content matches the original data + byte[] range1Data = new byte[range1Length]; + byte[] range2Data = new byte[range2Length]; + + buffer1.get(range1Data); + buffer2.get(range2Data); + + assertThat(range1Data) + .isEqualTo(Arrays.copyOfRange(data, range1Offset, range1Offset + range1Length)); + assertThat(range2Data) + .isEqualTo(Arrays.copyOfRange(data, range2Offset, range2Offset + range2Length)); + } + } + private byte[] randomData(int size) { byte[] data = new byte[size]; random.nextBytes(data); @@ -217,4 +395,8 @@ private void createBucket(String bucketName) { protected S3Client s3Client() { return s3; } + + protected S3AsyncClient s3AsyncClient() { + return s3Async; + } } diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/AnalyticsAcceleratorInputStreamWrapper.java b/aws/src/main/java/org/apache/iceberg/aws/s3/AnalyticsAcceleratorInputStreamWrapper.java index 197f79905f0f..c17b765a01a5 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/AnalyticsAcceleratorInputStreamWrapper.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/AnalyticsAcceleratorInputStreamWrapper.java @@ -19,13 +19,25 @@ package org.apache.iceberg.aws.s3; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.function.Consumer; +import java.util.function.IntFunction; +import java.util.stream.Collectors; +import org.apache.iceberg.io.ParquetObjectRange; import org.apache.iceberg.io.SeekableInputStream; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream; /** A wrapper to convert {@link S3SeekableInputStream} to Iceberg {@link SeekableInputStream} */ class AnalyticsAcceleratorInputStreamWrapper extends SeekableInputStream { + private static final Logger LOG = + LoggerFactory.getLogger(AnalyticsAcceleratorInputStreamWrapper.class); private final S3SeekableInputStream delegate; + private boolean closed = false; AnalyticsAcceleratorInputStreamWrapper(S3SeekableInputStream stream) { this.delegate = stream; @@ -33,21 +45,26 @@ class AnalyticsAcceleratorInputStreamWrapper extends SeekableInputStream { @Override public int read() throws IOException { + Preconditions.checkState(!closed, "Cannot read: already closed"); return this.delegate.read(); } @Override public int read(byte[] b) throws IOException { + Preconditions.checkState(!closed, "Cannot read: already closed"); return this.delegate.read(b, 0, b.length); } @Override public int read(byte[] b, int off, int len) throws IOException { + Preconditions.checkState(!closed, "Cannot read: already closed"); return this.delegate.read(b, off, len); } @Override public void seek(long l) throws IOException { + Preconditions.checkState(!closed, "already closed"); + Preconditions.checkArgument(l >= 0, "position is negative: %s", l); this.delegate.seek(l); } @@ -58,6 +75,32 @@ public long getPos() { @Override public void close() throws IOException { + this.closed = true; this.delegate.close(); } + + private static final Consumer LOG_BYTE_BUFFER_RELEASED = + buffer -> LOG.info("Release buffer of length {}: {}", buffer.limit(), buffer); + + @Override + public void readVectored(List ranges, IntFunction allocate) + throws IOException { + Preconditions.checkState(!closed, "Cannot read: already closed"); + this.delegate.readVectored(convertRanges(ranges), allocate, LOG_BYTE_BUFFER_RELEASED); + } + + public static List convertRanges( + List ranges) { + return ranges.stream() + .map( + range -> + new software.amazon.s3.analyticsaccelerator.common.ObjectRange( + range.getByteBuffer(), range.getOffset(), range.getLength())) + .collect(Collectors.toList()); + } + + @Override + public boolean readVectoredAvailable(IntFunction allocate) { + return true; + } } diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/AnalyticsAcceleratorUtil.java b/aws/src/main/java/org/apache/iceberg/aws/s3/AnalyticsAcceleratorUtil.java index ed1c3a4879e3..12e303919a7c 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/AnalyticsAcceleratorUtil.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/AnalyticsAcceleratorUtil.java @@ -22,6 +22,7 @@ import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.RemovalListener; import java.io.IOException; +import org.apache.iceberg.exceptions.NotFoundException; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.SeekableInputStream; import org.apache.iceberg.util.Pair; @@ -29,6 +30,7 @@ import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.HeadObjectResponse; +import software.amazon.awssdk.services.s3.model.NoSuchKeyException; import software.amazon.s3.analyticsaccelerator.ObjectClientConfiguration; import software.amazon.s3.analyticsaccelerator.S3SdkObjectClient; import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream; @@ -58,7 +60,12 @@ private AnalyticsAcceleratorUtil() {} public static SeekableInputStream newStream(S3InputFile inputFile) { S3URI uri = S3URI.of(inputFile.uri().bucket(), inputFile.uri().key()); - HeadObjectResponse metadata = inputFile.getObjectMetadata(); + HeadObjectResponse metadata; + try { + metadata = inputFile.getObjectMetadata(); + } catch (NoSuchKeyException e) { + throw new NotFoundException(e, "Location does not exist: %s", inputFile.location()); + } OpenStreamInformation openStreamInfo = OpenStreamInformation.builder() .objectMetadata( diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java index 6bf582f00bbb..38e604b271ce 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java @@ -83,7 +83,7 @@ public class S3FileIOProperties implements Serializable { */ public static final String S3_ANALYTICS_ACCELERATOR_ENABLED = "s3.analytics-accelerator.enabled"; - public static final boolean S3_ANALYTICS_ACCELERATOR_ENABLED_DEFAULT = false; + public static final boolean S3_ANALYTICS_ACCELERATOR_ENABLED_DEFAULT = true; /** * This prefix allows users to configure the internal properties of the s3 analytics accelerator. diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/StaticClientFactory.java b/aws/src/test/java/org/apache/iceberg/aws/s3/StaticClientFactory.java index 76f47adb17e8..d9cb2871f825 100644 --- a/aws/src/test/java/org/apache/iceberg/aws/s3/StaticClientFactory.java +++ b/aws/src/test/java/org/apache/iceberg/aws/s3/StaticClientFactory.java @@ -28,6 +28,7 @@ class StaticClientFactory implements AwsClientFactory { static S3Client client; + static S3AsyncClient asyncClient; @Override public S3Client s3() { @@ -36,7 +37,7 @@ public S3Client s3() { @Override public S3AsyncClient s3Async() { - return null; + return asyncClient; } @Override diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index d7e41f597abc..693fba5f8e3a 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -22,7 +22,7 @@ [versions] activation = "1.1.1" aliyun-sdk-oss = "3.10.2" -analyticsaccelerator = "1.0.0" +analyticsaccelerator = "1.2.1" antlr = "4.9.3" antlr413 = "4.13.1" # For Spark 4.0 support aircompressor = "0.27" diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIO.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIO.java index b36acdbd6f8d..e4d90934afb3 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIO.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIO.java @@ -21,6 +21,11 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.function.IntFunction; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -29,11 +34,14 @@ import org.apache.iceberg.hadoop.HadoopOutputFile; import org.apache.iceberg.io.DelegatingInputStream; import org.apache.iceberg.io.DelegatingOutputStream; +import org.apache.iceberg.io.ParquetObjectRange; +import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.hadoop.util.HadoopStreams; import org.apache.parquet.io.DelegatingPositionOutputStream; import org.apache.parquet.io.DelegatingSeekableInputStream; import org.apache.parquet.io.InputFile; import org.apache.parquet.io.OutputFile; +import org.apache.parquet.io.ParquetFileRange; import org.apache.parquet.io.PositionOutputStream; import org.apache.parquet.io.SeekableInputStream; @@ -121,6 +129,33 @@ public long getPos() throws IOException { public void seek(long newPos) throws IOException { delegate.seek(newPos); } + + @Override + public boolean readVectoredAvailable(ByteBufferAllocator allocate) { + IntFunction delegateAllocate = (allocate::allocate); + return delegate.readVectoredAvailable(delegateAllocate); + } + + @Override + public void readVectored(List ranges, ByteBufferAllocator allocate) + throws IOException { + IntFunction delegateAllocate = (allocate::allocate); + List delegateRange = convertRanges(ranges); + + delegate.readVectored(delegateRange, delegateAllocate); + } + + private static List convertRanges(List ranges) { + return ranges.stream() + .map( + parquetFileRange -> { + CompletableFuture result = new CompletableFuture<>(); + parquetFileRange.setDataReadFuture(result); + return new ParquetObjectRange( + result, parquetFileRange.getOffset(), parquetFileRange.getLength()); + }) + .collect(Collectors.toList()); + } } private static class ParquetOutputStreamAdapter extends DelegatingPositionOutputStream {