diff --git a/api/src/main/java/org/apache/iceberg/io/ParquetObjectRange.java b/api/src/main/java/org/apache/iceberg/io/ParquetObjectRange.java new file mode 100644 index 000000000000..7de5349a94bb --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/io/ParquetObjectRange.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.io; + +import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; + +public class ParquetObjectRange { + public CompletableFuture getByteBuffer() { + return byteBuffer; + } + + public void setByteBuffer(CompletableFuture byteBuffer) { + this.byteBuffer = byteBuffer; + } + + public long getOffset() { + return offset; + } + + public void setOffset(long offset) { + this.offset = offset; + } + + public int getLength() { + return length; + } + + public void setLength(int length) { + this.length = length; + } + + private CompletableFuture byteBuffer; + private long offset; + private int length; + + public ParquetObjectRange(CompletableFuture byteBuffer, long offset, int length) { + this.byteBuffer = byteBuffer; + this.offset = offset; + this.length = length; + } +} diff --git a/api/src/main/java/org/apache/iceberg/io/SeekableInputStream.java b/api/src/main/java/org/apache/iceberg/io/SeekableInputStream.java index 290e9661da69..c7081a4009f9 100644 --- a/api/src/main/java/org/apache/iceberg/io/SeekableInputStream.java +++ b/api/src/main/java/org/apache/iceberg/io/SeekableInputStream.java @@ -20,6 +20,9 @@ import java.io.IOException; import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.function.IntFunction; /** * {@code SeekableInputStream} is an interface with the methods needed to read data from a file or @@ -43,4 +46,14 @@ public abstract class SeekableInputStream extends InputStream { * @throws IOException If the underlying stream throws IOException */ public abstract void seek(long newPos) throws IOException; + + public void readVectored(List ranges, IntFunction allocate) + throws IOException { + throw new UnsupportedOperationException( + "Default iceberg stream doesn't support read vector io"); + } + + public boolean readVectoredAvailable(IntFunction allocate) { + return false; + } } diff --git a/aws-bundle/build.gradle b/aws-bundle/build.gradle index 9e937aa0edb7..d86a8d2a08a1 100644 --- a/aws-bundle/build.gradle +++ b/aws-bundle/build.gradle @@ -39,7 +39,7 @@ project(":iceberg-aws-bundle") { implementation "software.amazon.awssdk:dynamodb" implementation "software.amazon.awssdk:lakeformation" - implementation libs.analyticsaccelerator.s3 + implementation(libs.analyticsaccelerator.s3) } shadowJar { @@ -52,10 +52,6 @@ project(":iceberg-aws-bundle") { include 'NOTICE' } - dependencies { - exclude(dependency('org.slf4j:slf4j-api')) - } - // relocate AWS-specific versions relocate 'org.apache.http', 'org.apache.iceberg.aws.shaded.org.apache.http' relocate 'io.netty', 'org.apache.iceberg.aws.shaded.io.netty' diff --git a/aws/src/integration/java/org/apache/iceberg/aws/glue/TestGlueCatalogCommitFailure.java b/aws/src/integration/java/org/apache/iceberg/aws/glue/TestGlueCatalogCommitFailure.java index 177a627efcab..52b61827048d 100644 --- a/aws/src/integration/java/org/apache/iceberg/aws/glue/TestGlueCatalogCommitFailure.java +++ b/aws/src/integration/java/org/apache/iceberg/aws/glue/TestGlueCatalogCommitFailure.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.aws.glue; +import static org.apache.iceberg.aws.s3.S3TestUtil.skipIfAnalyticsAcceleratorEnabled; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -28,6 +29,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.aws.AwsIntegTestUtil; +import org.apache.iceberg.aws.s3.S3FileIOProperties; import org.apache.iceberg.aws.s3.S3TestUtil; import org.apache.iceberg.aws.util.RetryDetector; import org.apache.iceberg.catalog.TableIdentifier; @@ -160,6 +162,9 @@ public void testCheckCommitStatusAfterRetries() { public void testNoRetryAwarenessCorruptsTable() { // This test exists to replicate the issue the prior test validates the fix for // See https://github.com/apache/iceberg/issues/7151 + skipIfAnalyticsAcceleratorEnabled( + new S3FileIOProperties(), + "Analytics Accelerator Library does not support custom Iceberg exception: NotFoundException"); String namespace = createNamespace(); String tableName = createTable(namespace); TableIdentifier tableId = TableIdentifier.of(namespace, tableName); diff --git a/aws/src/integration/java/org/apache/iceberg/aws/s3/MinioUtil.java b/aws/src/integration/java/org/apache/iceberg/aws/s3/MinioUtil.java index 90557720fa0e..17dc42749fdc 100644 --- a/aws/src/integration/java/org/apache/iceberg/aws/s3/MinioUtil.java +++ b/aws/src/integration/java/org/apache/iceberg/aws/s3/MinioUtil.java @@ -26,6 +26,8 @@ import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.LegacyMd5Plugin; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.S3AsyncClientBuilder; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.S3ClientBuilder; @@ -73,4 +75,16 @@ public static S3Client createS3Client(MinIOContainer container, boolean legacyMd builder.forcePathStyle(true); // OSX won't resolve subdomains return builder.build(); } + + public static S3AsyncClient createS3AsyncClient(MinIOContainer container) { + URI uri = URI.create(container.getS3URL()); + S3AsyncClientBuilder builder = S3AsyncClient.builder(); + builder.credentialsProvider( + StaticCredentialsProvider.create( + AwsBasicCredentials.create(container.getUserName(), container.getPassword()))); + builder.applyMutation(mutator -> mutator.endpointOverride(uri)); + builder.region(Region.US_EAST_1); + builder.forcePathStyle(true); // OSX won't resolve subdomains + return builder.build(); + } } diff --git a/aws/src/integration/java/org/apache/iceberg/aws/s3/S3TestUtil.java b/aws/src/integration/java/org/apache/iceberg/aws/s3/S3TestUtil.java index 8700da07c449..89bc5c6a2ef1 100644 --- a/aws/src/integration/java/org/apache/iceberg/aws/s3/S3TestUtil.java +++ b/aws/src/integration/java/org/apache/iceberg/aws/s3/S3TestUtil.java @@ -18,8 +18,15 @@ */ package org.apache.iceberg.aws.s3; +import static org.assertj.core.api.Assumptions.assumeThat; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class S3TestUtil { + private static final Logger LOG = LoggerFactory.getLogger(S3TestUtil.class); + private S3TestUtil() {} public static String getBucketFromUri(String s3Uri) { @@ -29,4 +36,18 @@ public static String getBucketFromUri(String s3Uri) { public static String getKeyFromUri(String s3Uri) { return new S3URI(s3Uri).key(); } + + /** + * Skip a test if the Analytics Accelerator Library for Amazon S3 is enabled. + * + * @param properties properties to probe + */ + public static void skipIfAnalyticsAcceleratorEnabled( + S3FileIOProperties properties, String message) { + boolean isAcceleratorEnabled = properties.isS3AnalyticsAcceleratorEnabled(); + if (isAcceleratorEnabled) { + LOG.warn(message); + } + assumeThat(!isAcceleratorEnabled).describedAs(message).isTrue(); + } } diff --git a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestFlakyS3InputStream.java b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestFlakyS3InputStream.java index f98d1a3d4471..167cf66e9c9b 100644 --- a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestFlakyS3InputStream.java +++ b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestFlakyS3InputStream.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.aws.s3; +import static org.apache.iceberg.aws.s3.S3TestUtil.skipIfAnalyticsAcceleratorEnabled; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; @@ -27,18 +28,25 @@ import java.io.IOException; import java.io.InputStream; import java.net.SocketTimeoutException; +import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; import javax.net.ssl.SSLException; +import org.apache.iceberg.io.SeekableInputStream; +import org.apache.iceberg.metrics.MetricsContext; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import software.amazon.awssdk.awscore.exception.AwsServiceException; import software.amazon.awssdk.core.ResponseInputStream; +import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.core.async.AsyncResponseTransformer; import software.amazon.awssdk.core.exception.SdkClientException; import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.core.sync.ResponseTransformer; +import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.CreateBucketRequest; import software.amazon.awssdk.services.s3.model.CreateBucketResponse; @@ -51,6 +59,8 @@ public class TestFlakyS3InputStream extends TestS3InputStream { + private final S3FileIOProperties s3FileIOProperties = new S3FileIOProperties(); + private AtomicInteger resetForRetryCounter; @BeforeEach @@ -59,7 +69,13 @@ public void setupTest() { } @Override - S3InputStream newInputStream(S3Client s3Client, S3URI uri) { + SeekableInputStream newInputStream(S3Client s3Client, S3AsyncClient s3AsyncClient, S3URI uri) { + if (s3FileIOProperties.isS3AnalyticsAcceleratorEnabled()) { + PrefixedS3Client client = + new PrefixedS3Client("s3", Map.of(), () -> s3Client, () -> s3AsyncClient); + return AnalyticsAcceleratorUtil.newStream( + S3InputFile.fromLocation(uri.location(), client, MetricsContext.nullMetrics())); + } return new S3InputStream(s3Client, uri) { @Override void resetForRetry() throws IOException { @@ -72,14 +88,24 @@ void resetForRetry() throws IOException { @ParameterizedTest @MethodSource("retryableExceptions") public void testReadWithFlakyStreamRetrySucceed(IOException exception) throws Exception { - testRead(flakyStreamClient(new AtomicInteger(3), exception)); + skipIfAnalyticsAcceleratorEnabled( + s3FileIOProperties, "Analytics Accelerator Library does not support retries at read level"); + testRead( + flakyStreamClient(new AtomicInteger(3), exception), + flakyStreamAsyncClient(new AtomicInteger(3), exception)); assertThat(resetForRetryCounter.get()).isEqualTo(2); } @ParameterizedTest @MethodSource("retryableExceptions") public void testReadWithFlakyStreamExhaustedRetries(IOException exception) { - assertThatThrownBy(() -> testRead(flakyStreamClient(new AtomicInteger(5), exception))) + skipIfAnalyticsAcceleratorEnabled( + s3FileIOProperties, "Analytics Accelerator Library does not support retries at read level"); + assertThatThrownBy( + () -> + testRead( + flakyStreamClient(new AtomicInteger(5), exception), + flakyStreamAsyncClient(new AtomicInteger(5), exception))) .isInstanceOf(exception.getClass()) .hasMessage(exception.getMessage()); assertThat(resetForRetryCounter.get()).isEqualTo(3); @@ -88,7 +114,13 @@ public void testReadWithFlakyStreamExhaustedRetries(IOException exception) { @ParameterizedTest @MethodSource("nonRetryableExceptions") public void testReadWithFlakyStreamNonRetryableException(IOException exception) { - assertThatThrownBy(() -> testRead(flakyStreamClient(new AtomicInteger(3), exception))) + skipIfAnalyticsAcceleratorEnabled( + s3FileIOProperties, "Analytics Accelerator wraps IOException differently"); + assertThatThrownBy( + () -> + testRead( + flakyStreamClient(new AtomicInteger(3), exception), + flakyStreamAsyncClient(new AtomicInteger(3), exception))) .isInstanceOf(exception.getClass()) .hasMessage(exception.getMessage()); assertThat(resetForRetryCounter.get()).isEqualTo(0); @@ -97,14 +129,24 @@ public void testReadWithFlakyStreamNonRetryableException(IOException exception) @ParameterizedTest @MethodSource("retryableExceptions") public void testSeekWithFlakyStreamRetrySucceed(IOException exception) throws Exception { - testSeek(flakyStreamClient(new AtomicInteger(3), exception)); + skipIfAnalyticsAcceleratorEnabled( + s3FileIOProperties, "Analytics Accelerator Library does not support retries at read level"); + testSeek( + flakyStreamClient(new AtomicInteger(3), exception), + flakyStreamAsyncClient(new AtomicInteger(3), exception)); assertThat(resetForRetryCounter.get()).isEqualTo(2); } @ParameterizedTest @MethodSource("retryableExceptions") public void testSeekWithFlakyStreamExhaustedRetries(IOException exception) { - assertThatThrownBy(() -> testSeek(flakyStreamClient(new AtomicInteger(5), exception))) + skipIfAnalyticsAcceleratorEnabled( + s3FileIOProperties, "Analytics Accelerator Library does not support retries at read level"); + assertThatThrownBy( + () -> + testSeek( + flakyStreamClient(new AtomicInteger(5), exception), + flakyStreamAsyncClient(new AtomicInteger(5), exception))) .isInstanceOf(exception.getClass()) .hasMessage(exception.getMessage()); assertThat(resetForRetryCounter.get()).isEqualTo(3); @@ -113,7 +155,13 @@ public void testSeekWithFlakyStreamExhaustedRetries(IOException exception) { @ParameterizedTest @MethodSource("nonRetryableExceptions") public void testSeekWithFlakyStreamNonRetryableException(IOException exception) { - assertThatThrownBy(() -> testSeek(flakyStreamClient(new AtomicInteger(3), exception))) + skipIfAnalyticsAcceleratorEnabled( + s3FileIOProperties, "Analytics Accelerator wraps IOException differently"); + assertThatThrownBy( + () -> + testSeek( + flakyStreamClient(new AtomicInteger(3), exception), + flakyStreamAsyncClient(new AtomicInteger(3), exception))) .isInstanceOf(exception.getClass()) .hasMessage(exception.getMessage()); assertThat(resetForRetryCounter.get()).isEqualTo(0); @@ -138,6 +186,14 @@ private S3ClientWrapper flakyStreamClient(AtomicInteger counter, IOException fai return flakyClient; } + private S3AsyncClientWrapper flakyStreamAsyncClient(AtomicInteger counter, IOException failure) { + S3AsyncClientWrapper flakyClient = spy(new S3AsyncClientWrapper(s3AsyncClient())); + doAnswer(invocation -> new FlakyInputStream(invocation.callRealMethod(), counter, failure)) + .when(flakyClient) + .getObject(any(GetObjectRequest.class), any(AsyncResponseTransformer.class)); + return flakyClient; + } + /** Wrapper for S3 client, used to mock the final class DefaultS3Client */ public static class S3ClientWrapper implements S3Client { @@ -184,6 +240,50 @@ public CreateBucketResponse createBucket(CreateBucketRequest createBucketRequest } } + /** Wrapper for S3 Async client, used to mock the final class DefaultS3AsyncClient */ + public static class S3AsyncClientWrapper implements S3AsyncClient { + + private final S3AsyncClient delegate; + + public S3AsyncClientWrapper(S3AsyncClient delegate) { + this.delegate = delegate; + } + + @Override + public String serviceName() { + return delegate.serviceName(); + } + + @Override + public void close() { + delegate.close(); + } + + @Override + public CompletableFuture getObject( + GetObjectRequest getObjectRequest, + AsyncResponseTransformer asyncResponseTransformer) { + return delegate.getObject(getObjectRequest, asyncResponseTransformer); + } + + @Override + public CompletableFuture headObject(HeadObjectRequest headObjectRequest) { + return delegate.headObject(headObjectRequest); + } + + @Override + public CompletableFuture putObject( + PutObjectRequest putObjectRequest, AsyncRequestBody requestBody) { + return delegate.putObject(putObjectRequest, requestBody); + } + + @Override + public CompletableFuture createBucket( + CreateBucketRequest createBucketRequest) { + return delegate.createBucket(createBucketRequest); + } + } + static class FlakyInputStream extends InputStream { private final ResponseInputStream delegate; private final AtomicInteger counter; diff --git a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestMinioUtil.java b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestMinioUtil.java index 9955aa7f8459..4f56455afcc1 100644 --- a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestMinioUtil.java +++ b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestMinioUtil.java @@ -22,12 +22,16 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.util.UUID; +import java.util.concurrent.CompletionException; import org.junit.jupiter.api.Test; import org.testcontainers.containers.MinIOContainer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; +import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.core.async.AsyncResponseTransformer; import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.core.sync.ResponseTransformer; +import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.CreateBucketRequest; import software.amazon.awssdk.services.s3.model.CreateBucketResponse; @@ -72,4 +76,41 @@ void validateS3ConditionalWrites() { String responseBody = getResponse.asUtf8String(); assertThat(responseBody).isEqualTo("test-payload-0"); } + + @Test + void validateS3ConditionalWritesUsingAsyncClient() { + S3AsyncClient s3AsyncClient = MinioUtil.createS3AsyncClient(MINIO); + + String bucket = "test-bucket-" + UUID.randomUUID(); + + CreateBucketResponse createBucketResponse = + s3AsyncClient.createBucket(CreateBucketRequest.builder().bucket(bucket).build()).join(); + assertThat(createBucketResponse.sdkHttpResponse().isSuccessful()).isTrue(); + + String key = "test-key-" + UUID.randomUUID().toString(); + for (int i = 0; i < 5; i++) { + String payload = "test-payload-" + i; + PutObjectRequest request = + PutObjectRequest.builder().bucket(bucket).key(key).ifNoneMatch("*").build(); + AsyncRequestBody body = AsyncRequestBody.fromString(payload); + if (i == 0) { + PutObjectResponse response = s3AsyncClient.putObject(request, body).join(); + assertThat(response.sdkHttpResponse().isSuccessful()).isTrue(); + } else { + assertThatThrownBy(() -> s3AsyncClient.putObject(request, body).join()) + .isInstanceOf(CompletionException.class) + .hasCauseInstanceOf(S3Exception.class) + .hasMessageContaining("Service: S3, Status Code: 412") + .hasMessageContaining("At least one of the pre-conditions you specified did not hold"); + } + } + + String responseBody = + s3AsyncClient + .getObject( + request -> request.bucket(bucket).key(key), AsyncResponseTransformer.toBytes()) + .join() + .asUtf8String(); + assertThat(responseBody).isEqualTo("test-payload-0"); + } } diff --git a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIO.java b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIO.java index ebee07e53e4c..8a477dd9c26a 100644 --- a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIO.java +++ b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIO.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.aws.s3; +import static org.apache.iceberg.aws.s3.S3TestUtil.skipIfAnalyticsAcceleratorEnabled; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.AdditionalAnswers.delegatesTo; @@ -119,7 +120,10 @@ public class TestS3FileIO { private final SerializableSupplier s3 = () -> MinioUtil.createS3Client(minio, legacyMd5PluginEnabled()); + private final SerializableSupplier s3Async = + () -> MinioUtil.createS3AsyncClient(minio); private final S3Client s3mock = mock(S3Client.class, delegatesTo(s3.get())); + private final S3AsyncClient s3Asyncmock = mock(S3AsyncClient.class, delegatesTo(s3Async.get())); private final Random random = new Random(1); private final int numBucketsForBatchDeletion = 3; private final String batchDeletionBucketPrefix = "batch-delete-"; @@ -148,13 +152,14 @@ protected boolean legacyMd5PluginEnabled() { @BeforeEach public void before() { - s3FileIO = new S3FileIO(() -> s3mock); + s3FileIO = new S3FileIO(() -> s3mock, () -> s3Asyncmock); s3FileIO.initialize(properties); createBucket(S3_GENERAL_PURPOSE_BUCKET); for (int i = 1; i <= numBucketsForBatchDeletion; i++) { createBucket(batchDeletionBucketPrefix + i); } StaticClientFactory.client = s3mock; + StaticClientFactory.asyncClient = s3Asyncmock; } @AfterEach @@ -398,6 +403,9 @@ public void testPrefixDelete() { @Test public void testReadMissingLocation() { + skipIfAnalyticsAcceleratorEnabled( + new S3FileIOProperties(properties), + "Analytics Accelerator Library does not support custom Iceberg exception: NotFoundException"); String location = "s3://bucket/path/to/data.parquet"; InputFile in = s3FileIO.newInputFile(location); @@ -408,6 +416,9 @@ public void testReadMissingLocation() { @Test public void testMissingTableMetadata() { + skipIfAnalyticsAcceleratorEnabled( + new S3FileIOProperties(properties), + "Analytics Accelerator Library does not support custom Iceberg exception: NotFoundException"); Map conf = Maps.newHashMap(); conf.put( CatalogProperties.URI, diff --git a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIOIntegration.java b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIOIntegration.java index 249279054baf..f12159a020b2 100644 --- a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIOIntegration.java +++ b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIOIntegration.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.aws.s3; +import static org.apache.iceberg.aws.s3.S3TestUtil.skipIfAnalyticsAcceleratorEnabled; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assumptions.assumeThat; @@ -209,8 +210,12 @@ public void testNewInputStreamWithAccessPoint() throws Exception { @Test public void testCrossRegionAccessEnabled() throws Exception { - clientFactory.initialize( - ImmutableMap.of(S3FileIOProperties.CROSS_REGION_ACCESS_ENABLED, "true")); + Map properties = + ImmutableMap.of(S3FileIOProperties.CROSS_REGION_ACCESS_ENABLED, "true"); + skipIfAnalyticsAcceleratorEnabled( + new S3FileIOProperties(properties), + "S3 Async Clients needed for Analytics Accelerator Library does not support Cross Region Access"); + clientFactory.initialize(properties); S3Client s3Client = clientFactory.s3(); String crossBucketObjectKey = String.format("%s/%s", prefix, UUID.randomUUID()); String crossBucketObjectUri = @@ -234,7 +239,12 @@ public void testCrossRegionAccessEnabled() throws Exception { @Test public void testNewInputStreamWithCrossRegionAccessPoint() throws Exception { requireAccessPointSupport(); - clientFactory.initialize(ImmutableMap.of(S3FileIOProperties.USE_ARN_REGION_ENABLED, "true")); + Map properties = + ImmutableMap.of(S3FileIOProperties.USE_ARN_REGION_ENABLED, "true"); + skipIfAnalyticsAcceleratorEnabled( + new S3FileIOProperties(properties), + "S3 Async Clients needed for Analytics Accelerator Library does not support Cross Region Access Points"); + clientFactory.initialize(properties); S3Client s3Client = clientFactory.s3(); s3Client.putObject( PutObjectRequest.builder().bucket(bucketName).key(objectKey).build(), @@ -255,10 +265,14 @@ public void testNewInputStreamWithCrossRegionAccessPoint() throws Exception { validateRead(s3FileIO); } - @Test - public void testNewInputStreamWithMultiRegionAccessPoint() throws Exception { + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testNewInputStreamWithMultiRegionAccessPoint(Map aalProperties) + throws Exception { assumeThat(multiRegionAccessPointAlias).isNotEmpty(); - clientFactory.initialize(ImmutableMap.of(S3FileIOProperties.USE_ARN_REGION_ENABLED, "true")); + Map testProperties = + ImmutableMap.of(S3FileIOProperties.USE_ARN_REGION_ENABLED, "true"); + clientFactory.initialize(mergeProperties(aalProperties, testProperties)); S3Client s3Client = clientFactory.s3(); s3Client.putObject( PutObjectRequest.builder().bucket(bucketName).key(objectKey).build(), @@ -485,14 +499,18 @@ public void testServerSideCustomEncryption() throws Exception { new String(encoder.encode(digest.digest(secretKey.getEncoded())), StandardCharsets.UTF_8); S3FileIO s3FileIO = new S3FileIO(clientFactory::s3); - s3FileIO.initialize( + Map properties = ImmutableMap.of( S3FileIOProperties.SSE_TYPE, S3FileIOProperties.SSE_TYPE_CUSTOM, S3FileIOProperties.SSE_KEY, encodedKey, S3FileIOProperties.SSE_MD5, - md5)); + md5); + skipIfAnalyticsAcceleratorEnabled( + new S3FileIOProperties(properties), + "Analytics Accelerator Library does not support Server Side Custom Encryption"); + s3FileIO.initialize(properties); write(s3FileIO); validateRead(s3FileIO); GetObjectResponse response = diff --git a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3InputStream.java b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3InputStream.java index f8903842df37..3f96d7d05912 100644 --- a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3InputStream.java +++ b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3InputStream.java @@ -18,21 +18,32 @@ */ package org.apache.iceberg.aws.s3; +import static org.apache.iceberg.aws.s3.S3TestUtil.skipIfAnalyticsAcceleratorEnabled; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assumptions.assumeThat; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.List; +import java.util.Map; import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.function.IntFunction; import org.apache.iceberg.io.IOUtil; +import org.apache.iceberg.io.ParquetObjectRange; import org.apache.iceberg.io.RangeReadable; import org.apache.iceberg.io.SeekableInputStream; +import org.apache.iceberg.metrics.MetricsContext; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.testcontainers.containers.MinIOContainer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.BucketAlreadyExistsException; import software.amazon.awssdk.services.s3.model.BucketAlreadyOwnedByYouException; @@ -44,7 +55,9 @@ public class TestS3InputStream { @Container private static final MinIOContainer MINIO = MinioUtil.createContainer(); private final S3Client s3 = MinioUtil.createS3Client(MINIO); + private final S3AsyncClient s3Async = MinioUtil.createS3AsyncClient(MINIO); private final Random random = new Random(1); + private final S3FileIOProperties s3FileIOProperties = new S3FileIOProperties(); @BeforeEach public void before() { @@ -53,21 +66,27 @@ public void before() { @Test public void testRead() throws Exception { - testRead(s3); + testRead(s3, s3Async); } - S3InputStream newInputStream(S3Client s3Client, S3URI uri) { + SeekableInputStream newInputStream(S3Client s3Client, S3AsyncClient s3AsyncClient, S3URI uri) { + if (s3FileIOProperties.isS3AnalyticsAcceleratorEnabled()) { + PrefixedS3Client client = + new PrefixedS3Client("s3", Map.of(), () -> s3Client, () -> s3AsyncClient); + return AnalyticsAcceleratorUtil.newStream( + S3InputFile.fromLocation(uri.location(), client, MetricsContext.nullMetrics())); + } return new S3InputStream(s3Client, uri); } - protected void testRead(S3Client s3Client) throws Exception { + protected void testRead(S3Client s3Client, S3AsyncClient s3AsyncClient) throws Exception { S3URI uri = new S3URI("s3://bucket/path/to/read.dat"); int dataSize = 1024 * 1024 * 10; byte[] data = randomData(dataSize); writeS3Data(uri, data); - try (SeekableInputStream in = newInputStream(s3Client, uri)) { + try (SeekableInputStream in = newInputStream(s3Client, s3AsyncClient, uri)) { int readSize = 1024; readAndCheck(in, in.getPos(), readSize, data, false); readAndCheck(in, in.getPos(), readSize, data, true); @@ -116,10 +135,12 @@ private void readAndCheck( @Test public void testRangeRead() throws Exception { - testRangeRead(s3); + skipIfAnalyticsAcceleratorEnabled( + s3FileIOProperties, "Analytics Accelerator Library does not support range reads"); + testRangeRead(s3, s3Async); } - protected void testRangeRead(S3Client s3Client) throws Exception { + protected void testRangeRead(S3Client s3Client, S3AsyncClient s3AsyncClient) throws Exception { S3URI uri = new S3URI("s3://bucket/path/to/range-read.dat"); int dataSize = 1024 * 1024 * 10; byte[] expected = randomData(dataSize); @@ -131,7 +152,7 @@ protected void testRangeRead(S3Client s3Client) throws Exception { writeS3Data(uri, expected); - try (RangeReadable in = newInputStream(s3Client, uri)) { + try (RangeReadable in = (RangeReadable) newInputStream(s3Client, s3AsyncClient, uri)) { // first 1k position = 0; offset = 0; @@ -162,8 +183,11 @@ private void readAndCheckRanges( @Test public void testClose() throws Exception { + skipIfAnalyticsAcceleratorEnabled( + s3FileIOProperties, + "Analytics Accelerator Library has different exception handling when closed"); S3URI uri = new S3URI("s3://bucket/path/to/closed.dat"); - SeekableInputStream closed = newInputStream(s3, uri); + SeekableInputStream closed = newInputStream(s3, s3Async, uri); closed.close(); assertThatThrownBy(() -> closed.seek(0)) .isInstanceOf(IllegalStateException.class) @@ -172,16 +196,16 @@ public void testClose() throws Exception { @Test public void testSeek() throws Exception { - testSeek(s3); + testSeek(s3, s3Async); } - protected void testSeek(S3Client s3Client) throws Exception { + protected void testSeek(S3Client s3Client, S3AsyncClient s3AsyncClient) throws Exception { S3URI uri = new S3URI("s3://bucket/path/to/seek.dat"); byte[] expected = randomData(1024 * 1024); writeS3Data(uri, expected); - try (SeekableInputStream in = newInputStream(s3Client, uri)) { + try (SeekableInputStream in = newInputStream(s3Client, s3AsyncClient, uri)) { in.seek(expected.length / 2); byte[] actual = new byte[expected.length / 2]; IOUtil.readFully(in, actual, 0, expected.length / 2); @@ -190,6 +214,143 @@ protected void testSeek(S3Client s3Client) throws Exception { } } + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testVectoredRead(Map aalProperties) throws Exception { + testVectoredRead(s3, s3Async, aalProperties); + } + + protected void testVectoredRead( + S3Client s3Client, S3AsyncClient s3AsyncClient, Map aalProperties) + throws Exception { + final S3FileIOProperties s3FileIOProperties = new S3FileIOProperties(aalProperties); + assumeThat(s3FileIOProperties.isS3AnalyticsAcceleratorEnabled()).isTrue(); + + S3URI uri = new S3URI("s3://bucket/path/to/vectored-read.dat"); + int dataSize = 1024 * 1024 * 10; + byte[] data = randomData(dataSize); + + writeS3Data(uri, data); + + try (SeekableInputStream in = newInputStream(s3Client, s3AsyncClient, uri, aalProperties)) { + IntFunction allocate = ByteBuffer::allocate; + assertThat(in.readVectoredAvailable(allocate)).isTrue(); + + List ranges = Lists.newArrayList(); + CompletableFuture future1 = new CompletableFuture<>(); + CompletableFuture future2 = new CompletableFuture<>(); + CompletableFuture future3 = new CompletableFuture<>(); + + // First range: first 1024 bytes + int range1Offset = 0; + int range1Length = 1024; + ranges.add(new ParquetObjectRange(future1, range1Offset, range1Length)); + + // Second range: middle 2048 bytes + int range2Offset = dataSize / 2; + int range2Length = 2048; + ranges.add(new ParquetObjectRange(future2, range2Offset, range2Length)); + + // Third range: last 1024 bytes + int range3Offset = dataSize - 1024; + int range3Length = 1024; + ranges.add(new ParquetObjectRange(future3, range3Offset, range3Length)); + + in.readVectored(ranges, allocate); + + ByteBuffer buffer1 = future1.get(); + ByteBuffer buffer2 = future2.get(); + ByteBuffer buffer3 = future3.get(); + + assertThat(future1.isDone()).isTrue(); + assertThat(future2.isDone()).isTrue(); + assertThat(future3.isDone()).isTrue(); + + assertThat(buffer1.limit()).isEqualTo(range1Length); + assertThat(buffer2.limit()).isEqualTo(range2Length); + assertThat(buffer3.limit()).isEqualTo(range3Length); + + byte[] range1Data = new byte[range1Length]; + byte[] range2Data = new byte[range2Length]; + byte[] range3Data = new byte[range3Length]; + + buffer1.get(range1Data); + buffer2.get(range2Data); + buffer3.get(range3Data); + + assertThat(range1Data) + .isEqualTo(Arrays.copyOfRange(data, range1Offset, range1Offset + range1Length)); + assertThat(range2Data) + .isEqualTo(Arrays.copyOfRange(data, range2Offset, range2Offset + range2Length)); + assertThat(range3Data) + .isEqualTo(Arrays.copyOfRange(data, range3Offset, range3Offset + range3Length)); + } + } + + @ParameterizedTest + @MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties") + public void testVectoredReadWithNonNonContinuousRanges(Map aalProperties) + throws Exception { + testVectoredReadWithNonContinuousRanges(s3, s3Async, aalProperties); + } + + protected void testVectoredReadWithNonContinuousRanges( + S3Client s3Client, S3AsyncClient s3AsyncClient, Map aalProperties) + throws Exception { + + final S3FileIOProperties s3FileIOProperties = new S3FileIOProperties(aalProperties); + assumeThat(s3FileIOProperties.isS3AnalyticsAcceleratorEnabled()).isTrue(); + + S3URI uri = new S3URI("s3://bucket/path/to/vectored-read-overlapping.dat"); + int dataSize = 1024 * 1024; + byte[] data = randomData(dataSize); + + writeS3Data(uri, data); + + try (SeekableInputStream in = newInputStream(s3Client, s3AsyncClient, uri, aalProperties)) { + List ranges = Lists.newArrayList(); + CompletableFuture future1 = new CompletableFuture<>(); + CompletableFuture future2 = new CompletableFuture<>(); + + // First range: 0-1024 + int range1Offset = 0; + int range1Length = 1024; + ranges.add(new ParquetObjectRange(future1, range1Offset, range1Length)); + + // Second range: 2000-3400 + int range2Offset = 2000; + int range2Length = 3400; + ranges.add(new ParquetObjectRange(future2, range2Offset, range2Length)); + + // Call readVectored + IntFunction allocate = ByteBuffer::allocate; + in.readVectored(ranges, allocate); + + // Verify the buffers have the expected content + ByteBuffer buffer1 = future1.get(); + ByteBuffer buffer2 = future2.get(); + + // Verify the futures were completed + assertThat(future1.isDone()).isTrue(); + assertThat(future2.isDone()).isTrue(); + + assertThat(buffer1.limit()).isEqualTo(range1Length); + assertThat(buffer2.limit()).isEqualTo(range2Length); + + // Verify the buffer content matches the original data + byte[] range1Data = new byte[range1Length]; + byte[] range2Data = new byte[range2Length]; + + buffer1.get(range1Data); + buffer2.get(range2Data); + + assertThat(range1Data) + .isEqualTo(Arrays.copyOfRange(data, range1Offset, range1Offset + range1Length)); + assertThat(range2Data) + .isEqualTo(Arrays.copyOfRange(data, range2Offset, range2Offset + range2Length)); + } + } + private byte[] randomData(int size) { byte[] data = new byte[size]; random.nextBytes(data); @@ -217,4 +378,8 @@ private void createBucket(String bucketName) { protected S3Client s3Client() { return s3; } + + protected S3AsyncClient s3AsyncClient() { + return s3Async; + } } diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/AnalyticsAcceleratorInputStreamWrapper.java b/aws/src/main/java/org/apache/iceberg/aws/s3/AnalyticsAcceleratorInputStreamWrapper.java index 197f79905f0f..86f3de663444 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/AnalyticsAcceleratorInputStreamWrapper.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/AnalyticsAcceleratorInputStreamWrapper.java @@ -19,11 +19,21 @@ package org.apache.iceberg.aws.s3; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.function.Consumer; +import java.util.function.IntFunction; +import java.util.stream.Collectors; +import org.apache.iceberg.io.ParquetObjectRange; import org.apache.iceberg.io.SeekableInputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream; /** A wrapper to convert {@link S3SeekableInputStream} to Iceberg {@link SeekableInputStream} */ class AnalyticsAcceleratorInputStreamWrapper extends SeekableInputStream { + private static final Logger LOG = + LoggerFactory.getLogger(AnalyticsAcceleratorInputStreamWrapper.class); private final S3SeekableInputStream delegate; @@ -60,4 +70,29 @@ public long getPos() { public void close() throws IOException { this.delegate.close(); } + + private static final Consumer LOG_BYTE_BUFFER_RELEASED = + buffer -> LOG.info("Release buffer of length {}: {}", buffer.limit(), buffer); + + @Override + public void readVectored(List ranges, IntFunction allocate) + throws IOException { + LOG.info("Read vectored ranges: {}", ranges); + this.delegate.readVectored(convertRanges(ranges), allocate, LOG_BYTE_BUFFER_RELEASED); + } + + public static List convertRanges( + List ranges) { + return ranges.stream() + .map( + range -> + new software.amazon.s3.analyticsaccelerator.common.ObjectRange( + range.getByteBuffer(), range.getOffset(), range.getLength())) + .collect(Collectors.toList()); + } + + @Override + public boolean readVectoredAvailable(IntFunction allocate) { + return true; + } } diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/AnalyticsAcceleratorUtil.java b/aws/src/main/java/org/apache/iceberg/aws/s3/AnalyticsAcceleratorUtil.java index ed1c3a4879e3..d9a451e6c21b 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/AnalyticsAcceleratorUtil.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/AnalyticsAcceleratorUtil.java @@ -27,13 +27,13 @@ import org.apache.iceberg.util.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.HeadObjectResponse; import software.amazon.s3.analyticsaccelerator.ObjectClientConfiguration; -import software.amazon.s3.analyticsaccelerator.S3SdkObjectClient; import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream; import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration; import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory; +import software.amazon.s3.analyticsaccelerator.S3SyncSdkObjectClient; import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration; import software.amazon.s3.analyticsaccelerator.request.ObjectClient; import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata; @@ -44,13 +44,13 @@ class AnalyticsAcceleratorUtil { private static final Logger LOG = LoggerFactory.getLogger(AnalyticsAcceleratorUtil.class); - private static final Cache, S3SeekableInputStreamFactory> + private static final Cache, S3SeekableInputStreamFactory> STREAM_FACTORY_CACHE = Caffeine.newBuilder() .maximumSize(100) .removalListener( (RemovalListener< - Pair, S3SeekableInputStreamFactory>) + Pair, S3SeekableInputStreamFactory>) (key, factory, cause) -> close(factory)) .build(); @@ -70,7 +70,7 @@ public static SeekableInputStream newStream(S3InputFile inputFile) { S3SeekableInputStreamFactory factory = STREAM_FACTORY_CACHE.get( - Pair.of(inputFile.asyncClient(), inputFile.s3FileIOProperties()), + Pair.of(inputFile.client(), inputFile.s3FileIOProperties()), AnalyticsAcceleratorUtil::createNewFactory); try { @@ -83,7 +83,7 @@ public static SeekableInputStream newStream(S3InputFile inputFile) { } private static S3SeekableInputStreamFactory createNewFactory( - Pair cacheKey) { + Pair cacheKey) { ConnectorConfiguration connectorConfiguration = new ConnectorConfiguration(cacheKey.second().s3AnalyticsacceleratorProperties()); S3SeekableInputStreamConfiguration streamConfiguration = @@ -91,7 +91,10 @@ private static S3SeekableInputStreamFactory createNewFactory( ObjectClientConfiguration objectClientConfiguration = ObjectClientConfiguration.fromConfiguration(connectorConfiguration); - ObjectClient objectClient = new S3SdkObjectClient(cacheKey.first(), objectClientConfiguration); + // Use the existing S3Client from the cache key + S3Client s3Client = cacheKey.first(); + + ObjectClient objectClient = new S3SyncSdkObjectClient(s3Client, objectClientConfiguration); return new S3SeekableInputStreamFactory(objectClient, streamConfiguration); } @@ -105,8 +108,7 @@ private static void close(S3SeekableInputStreamFactory factory) { } } - public static void cleanupCache( - S3AsyncClient asyncClient, S3FileIOProperties s3FileIOProperties) { - STREAM_FACTORY_CACHE.invalidate(Pair.of(asyncClient, s3FileIOProperties)); + public static void cleanupCache(S3Client client, S3FileIOProperties s3FileIOProperties) { + STREAM_FACTORY_CACHE.invalidate(Pair.of(client, s3FileIOProperties)); } } diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/PrefixedS3Client.java b/aws/src/main/java/org/apache/iceberg/aws/s3/PrefixedS3Client.java index 400792cf976c..879f04a43003 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/PrefixedS3Client.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/PrefixedS3Client.java @@ -109,14 +109,14 @@ public S3FileIOProperties s3FileIOProperties() { @Override public void close() { if (null != s3Client) { + // cleanup usage in analytics accelerator if any + if (s3FileIOProperties().isS3AnalyticsAcceleratorEnabled()) { + AnalyticsAcceleratorUtil.cleanupCache(s3Client, s3FileIOProperties); + } s3Client.close(); } if (null != s3AsyncClient) { - // cleanup usage in analytics accelerator if any - if (s3FileIOProperties().isS3AnalyticsAcceleratorEnabled()) { - AnalyticsAcceleratorUtil.cleanupCache(s3AsyncClient, s3FileIOProperties); - } s3AsyncClient.close(); } } diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/StaticClientFactory.java b/aws/src/test/java/org/apache/iceberg/aws/s3/StaticClientFactory.java index 76f47adb17e8..d9cb2871f825 100644 --- a/aws/src/test/java/org/apache/iceberg/aws/s3/StaticClientFactory.java +++ b/aws/src/test/java/org/apache/iceberg/aws/s3/StaticClientFactory.java @@ -28,6 +28,7 @@ class StaticClientFactory implements AwsClientFactory { static S3Client client; + static S3AsyncClient asyncClient; @Override public S3Client s3() { @@ -36,7 +37,7 @@ public S3Client s3() { @Override public S3AsyncClient s3Async() { - return null; + return asyncClient; } @Override diff --git a/build.gradle b/build.gradle index bf09d1b492ff..ca1a9ee23d99 100644 --- a/build.gradle +++ b/build.gradle @@ -118,8 +118,8 @@ allprojects { group = "org.apache.iceberg" version = projectVersion repositories { - mavenCentral() mavenLocal() + mavenCentral() } } diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 77baf8a6a97b..928e174b68c7 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -22,7 +22,7 @@ [versions] activation = "1.1.1" aliyun-sdk-oss = "3.10.2" -analyticsaccelerator = "1.0.0" +analyticsaccelerator = "SNAPSHOT" antlr = "4.9.3" antlr413 = "4.13.1" # For Spark 4.0 support aircompressor = "0.27" diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index 6f68fbe150ff..bd5ae324cec5 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -1339,7 +1339,6 @@ public CloseableIterable build() { if (fileDecryptionProperties != null) { optionsBuilder.withDecryption(fileDecryptionProperties); } - ParquetReadOptions options = optionsBuilder.build(); NameMapping mapping; diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIO.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIO.java index b36acdbd6f8d..8ee9b41ec629 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIO.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIO.java @@ -21,6 +21,11 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.function.IntFunction; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -29,11 +34,14 @@ import org.apache.iceberg.hadoop.HadoopOutputFile; import org.apache.iceberg.io.DelegatingInputStream; import org.apache.iceberg.io.DelegatingOutputStream; +import org.apache.iceberg.io.ParquetObjectRange; +import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.hadoop.util.HadoopStreams; import org.apache.parquet.io.DelegatingPositionOutputStream; import org.apache.parquet.io.DelegatingSeekableInputStream; import org.apache.parquet.io.InputFile; import org.apache.parquet.io.OutputFile; +import org.apache.parquet.io.ParquetFileRange; import org.apache.parquet.io.PositionOutputStream; import org.apache.parquet.io.SeekableInputStream; @@ -121,6 +129,32 @@ public long getPos() throws IOException { public void seek(long newPos) throws IOException { delegate.seek(newPos); } + + @Override + public boolean readVectoredAvailable(ByteBufferAllocator allocate) { + IntFunction delegateAllocate = (allocate::allocate); + return delegate.readVectoredAvailable(delegateAllocate); + } + + @Override + public void readVectored(List ranges, ByteBufferAllocator allocate) + throws IOException { + IntFunction delegateAllocate = (allocate::allocate); + List delegateRange = convertRanges(ranges); + delegate.readVectored(delegateRange, delegateAllocate); + } + + private static List convertRanges(List ranges) { + return ranges.stream() + .map( + parquetFileRange -> { + CompletableFuture result = new CompletableFuture<>(); + parquetFileRange.setDataReadFuture(result); + return new ParquetObjectRange( + result, parquetFileRange.getOffset(), parquetFileRange.getLength()); + }) + .collect(Collectors.toList()); + } } private static class ParquetOutputStreamAdapter extends DelegatingPositionOutputStream {