Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions api/src/main/java/org/apache/iceberg/io/ParquetObjectRange.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.io;

import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;

public class ParquetObjectRange {
public CompletableFuture<ByteBuffer> getByteBuffer() {
return byteBuffer;
}

public void setByteBuffer(CompletableFuture<ByteBuffer> byteBuffer) {
this.byteBuffer = byteBuffer;
}

public long getOffset() {
return offset;
}

public void setOffset(long offset) {
this.offset = offset;
}

public int getLength() {
return length;
}

public void setLength(int length) {
this.length = length;
}

private CompletableFuture<ByteBuffer> byteBuffer;
private long offset;
private int length;

public ParquetObjectRange(CompletableFuture<ByteBuffer> byteBuffer, long offset, int length) {
this.byteBuffer = byteBuffer;
this.offset = offset;
this.length = length;
}
}
13 changes: 13 additions & 0 deletions api/src/main/java/org/apache/iceberg/io/SeekableInputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.function.IntFunction;

/**
* {@code SeekableInputStream} is an interface with the methods needed to read data from a file or
Expand All @@ -43,4 +46,14 @@ public abstract class SeekableInputStream extends InputStream {
* @throws IOException If the underlying stream throws IOException
*/
public abstract void seek(long newPos) throws IOException;

public void readVectored(List<ParquetObjectRange> ranges, IntFunction<ByteBuffer> allocate)
throws IOException {
throw new UnsupportedOperationException(
"Default iceberg stream doesn't support read vector io");
}

public boolean readVectoredAvailable(IntFunction<ByteBuffer> allocate) {
return false;
}
}
6 changes: 1 addition & 5 deletions aws-bundle/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ project(":iceberg-aws-bundle") {
implementation "software.amazon.awssdk:dynamodb"
implementation "software.amazon.awssdk:lakeformation"

implementation libs.analyticsaccelerator.s3
implementation(libs.analyticsaccelerator.s3)
}

shadowJar {
Expand All @@ -52,10 +52,6 @@ project(":iceberg-aws-bundle") {
include 'NOTICE'
}

dependencies {
exclude(dependency('org.slf4j:slf4j-api'))
}

// relocate AWS-specific versions
relocate 'org.apache.http', 'org.apache.iceberg.aws.shaded.org.apache.http'
relocate 'io.netty', 'org.apache.iceberg.aws.shaded.io.netty'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg.aws.glue;

import static org.apache.iceberg.aws.s3.S3TestUtil.skipIfAnalyticsAcceleratorEnabled;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

Expand All @@ -28,6 +29,7 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.aws.AwsIntegTestUtil;
import org.apache.iceberg.aws.s3.S3FileIOProperties;
import org.apache.iceberg.aws.s3.S3TestUtil;
import org.apache.iceberg.aws.util.RetryDetector;
import org.apache.iceberg.catalog.TableIdentifier;
Expand Down Expand Up @@ -160,6 +162,9 @@ public void testCheckCommitStatusAfterRetries() {
public void testNoRetryAwarenessCorruptsTable() {
// This test exists to replicate the issue the prior test validates the fix for
// See https://github.com/apache/iceberg/issues/7151
skipIfAnalyticsAcceleratorEnabled(
new S3FileIOProperties(),
"Analytics Accelerator Library does not support custom Iceberg exception: NotFoundException");
String namespace = createNamespace();
String tableName = createTable(namespace);
TableIdentifier tableId = TableIdentifier.of(namespace, tableName);
Expand Down
14 changes: 14 additions & 0 deletions aws/src/integration/java/org/apache/iceberg/aws/s3/MinioUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.LegacyMd5Plugin;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3ClientBuilder;

Expand Down Expand Up @@ -73,4 +75,16 @@ public static S3Client createS3Client(MinIOContainer container, boolean legacyMd
builder.forcePathStyle(true); // OSX won't resolve subdomains
return builder.build();
}

public static S3AsyncClient createS3AsyncClient(MinIOContainer container) {
URI uri = URI.create(container.getS3URL());
S3AsyncClientBuilder builder = S3AsyncClient.builder();
builder.credentialsProvider(
StaticCredentialsProvider.create(
AwsBasicCredentials.create(container.getUserName(), container.getPassword())));
builder.applyMutation(mutator -> mutator.endpointOverride(uri));
builder.region(Region.US_EAST_1);
builder.forcePathStyle(true); // OSX won't resolve subdomains
return builder.build();
}
}
21 changes: 21 additions & 0 deletions aws/src/integration/java/org/apache/iceberg/aws/s3/S3TestUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,15 @@
*/
package org.apache.iceberg.aws.s3;

import static org.assertj.core.api.Assumptions.assumeThat;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class S3TestUtil {

private static final Logger LOG = LoggerFactory.getLogger(S3TestUtil.class);

private S3TestUtil() {}

public static String getBucketFromUri(String s3Uri) {
Expand All @@ -29,4 +36,18 @@ public static String getBucketFromUri(String s3Uri) {
public static String getKeyFromUri(String s3Uri) {
return new S3URI(s3Uri).key();
}

/**
* Skip a test if the Analytics Accelerator Library for Amazon S3 is enabled.
*
* @param properties properties to probe
*/
public static void skipIfAnalyticsAcceleratorEnabled(
S3FileIOProperties properties, String message) {
boolean isAcceleratorEnabled = properties.isS3AnalyticsAcceleratorEnabled();
if (isAcceleratorEnabled) {
LOG.warn(message);
}
assumeThat(!isAcceleratorEnabled).describedAs(message).isTrue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg.aws.s3;

import static org.apache.iceberg.aws.s3.S3TestUtil.skipIfAnalyticsAcceleratorEnabled;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
Expand All @@ -27,18 +28,25 @@
import java.io.IOException;
import java.io.InputStream;
import java.net.SocketTimeoutException;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import javax.net.ssl.SSLException;
import org.apache.iceberg.io.SeekableInputStream;
import org.apache.iceberg.metrics.MetricsContext;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.core.sync.ResponseTransformer;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
import software.amazon.awssdk.services.s3.model.CreateBucketResponse;
Expand All @@ -51,6 +59,8 @@

public class TestFlakyS3InputStream extends TestS3InputStream {

private final S3FileIOProperties s3FileIOProperties = new S3FileIOProperties();

private AtomicInteger resetForRetryCounter;

@BeforeEach
Expand All @@ -59,7 +69,13 @@ public void setupTest() {
}

@Override
S3InputStream newInputStream(S3Client s3Client, S3URI uri) {
SeekableInputStream newInputStream(S3Client s3Client, S3AsyncClient s3AsyncClient, S3URI uri) {
if (s3FileIOProperties.isS3AnalyticsAcceleratorEnabled()) {
PrefixedS3Client client =
new PrefixedS3Client("s3", Map.of(), () -> s3Client, () -> s3AsyncClient);
return AnalyticsAcceleratorUtil.newStream(
S3InputFile.fromLocation(uri.location(), client, MetricsContext.nullMetrics()));
}
return new S3InputStream(s3Client, uri) {
@Override
void resetForRetry() throws IOException {
Expand All @@ -72,14 +88,24 @@ void resetForRetry() throws IOException {
@ParameterizedTest
@MethodSource("retryableExceptions")
public void testReadWithFlakyStreamRetrySucceed(IOException exception) throws Exception {
testRead(flakyStreamClient(new AtomicInteger(3), exception));
skipIfAnalyticsAcceleratorEnabled(
s3FileIOProperties, "Analytics Accelerator Library does not support retries at read level");
testRead(
flakyStreamClient(new AtomicInteger(3), exception),
flakyStreamAsyncClient(new AtomicInteger(3), exception));
assertThat(resetForRetryCounter.get()).isEqualTo(2);
}

@ParameterizedTest
@MethodSource("retryableExceptions")
public void testReadWithFlakyStreamExhaustedRetries(IOException exception) {
assertThatThrownBy(() -> testRead(flakyStreamClient(new AtomicInteger(5), exception)))
skipIfAnalyticsAcceleratorEnabled(
s3FileIOProperties, "Analytics Accelerator Library does not support retries at read level");
assertThatThrownBy(
() ->
testRead(
flakyStreamClient(new AtomicInteger(5), exception),
flakyStreamAsyncClient(new AtomicInteger(5), exception)))
.isInstanceOf(exception.getClass())
.hasMessage(exception.getMessage());
assertThat(resetForRetryCounter.get()).isEqualTo(3);
Expand All @@ -88,7 +114,13 @@ public void testReadWithFlakyStreamExhaustedRetries(IOException exception) {
@ParameterizedTest
@MethodSource("nonRetryableExceptions")
public void testReadWithFlakyStreamNonRetryableException(IOException exception) {
assertThatThrownBy(() -> testRead(flakyStreamClient(new AtomicInteger(3), exception)))
skipIfAnalyticsAcceleratorEnabled(
s3FileIOProperties, "Analytics Accelerator wraps IOException differently");
assertThatThrownBy(
() ->
testRead(
flakyStreamClient(new AtomicInteger(3), exception),
flakyStreamAsyncClient(new AtomicInteger(3), exception)))
.isInstanceOf(exception.getClass())
.hasMessage(exception.getMessage());
assertThat(resetForRetryCounter.get()).isEqualTo(0);
Expand All @@ -97,14 +129,24 @@ public void testReadWithFlakyStreamNonRetryableException(IOException exception)
@ParameterizedTest
@MethodSource("retryableExceptions")
public void testSeekWithFlakyStreamRetrySucceed(IOException exception) throws Exception {
testSeek(flakyStreamClient(new AtomicInteger(3), exception));
skipIfAnalyticsAcceleratorEnabled(
s3FileIOProperties, "Analytics Accelerator Library does not support retries at read level");
testSeek(
flakyStreamClient(new AtomicInteger(3), exception),
flakyStreamAsyncClient(new AtomicInteger(3), exception));
assertThat(resetForRetryCounter.get()).isEqualTo(2);
}

@ParameterizedTest
@MethodSource("retryableExceptions")
public void testSeekWithFlakyStreamExhaustedRetries(IOException exception) {
assertThatThrownBy(() -> testSeek(flakyStreamClient(new AtomicInteger(5), exception)))
skipIfAnalyticsAcceleratorEnabled(
s3FileIOProperties, "Analytics Accelerator Library does not support retries at read level");
assertThatThrownBy(
() ->
testSeek(
flakyStreamClient(new AtomicInteger(5), exception),
flakyStreamAsyncClient(new AtomicInteger(5), exception)))
.isInstanceOf(exception.getClass())
.hasMessage(exception.getMessage());
assertThat(resetForRetryCounter.get()).isEqualTo(3);
Expand All @@ -113,7 +155,13 @@ public void testSeekWithFlakyStreamExhaustedRetries(IOException exception) {
@ParameterizedTest
@MethodSource("nonRetryableExceptions")
public void testSeekWithFlakyStreamNonRetryableException(IOException exception) {
assertThatThrownBy(() -> testSeek(flakyStreamClient(new AtomicInteger(3), exception)))
skipIfAnalyticsAcceleratorEnabled(
s3FileIOProperties, "Analytics Accelerator wraps IOException differently");
assertThatThrownBy(
() ->
testSeek(
flakyStreamClient(new AtomicInteger(3), exception),
flakyStreamAsyncClient(new AtomicInteger(3), exception)))
.isInstanceOf(exception.getClass())
.hasMessage(exception.getMessage());
assertThat(resetForRetryCounter.get()).isEqualTo(0);
Expand All @@ -138,6 +186,14 @@ private S3ClientWrapper flakyStreamClient(AtomicInteger counter, IOException fai
return flakyClient;
}

private S3AsyncClientWrapper flakyStreamAsyncClient(AtomicInteger counter, IOException failure) {
S3AsyncClientWrapper flakyClient = spy(new S3AsyncClientWrapper(s3AsyncClient()));
doAnswer(invocation -> new FlakyInputStream(invocation.callRealMethod(), counter, failure))
.when(flakyClient)
.getObject(any(GetObjectRequest.class), any(AsyncResponseTransformer.class));
return flakyClient;
}

/** Wrapper for S3 client, used to mock the final class DefaultS3Client */
public static class S3ClientWrapper implements S3Client {

Expand Down Expand Up @@ -184,6 +240,50 @@ public CreateBucketResponse createBucket(CreateBucketRequest createBucketRequest
}
}

/** Wrapper for S3 Async client, used to mock the final class DefaultS3AsyncClient */
public static class S3AsyncClientWrapper implements S3AsyncClient {

private final S3AsyncClient delegate;

public S3AsyncClientWrapper(S3AsyncClient delegate) {
this.delegate = delegate;
}

@Override
public String serviceName() {
return delegate.serviceName();
}

@Override
public void close() {
delegate.close();
}

@Override
public <ReturnT> CompletableFuture<ReturnT> getObject(
GetObjectRequest getObjectRequest,
AsyncResponseTransformer<GetObjectResponse, ReturnT> asyncResponseTransformer) {
return delegate.getObject(getObjectRequest, asyncResponseTransformer);
}

@Override
public CompletableFuture<HeadObjectResponse> headObject(HeadObjectRequest headObjectRequest) {
return delegate.headObject(headObjectRequest);
}

@Override
public CompletableFuture<PutObjectResponse> putObject(
PutObjectRequest putObjectRequest, AsyncRequestBody requestBody) {
return delegate.putObject(putObjectRequest, requestBody);
}

@Override
public CompletableFuture<CreateBucketResponse> createBucket(
CreateBucketRequest createBucketRequest) {
return delegate.createBucket(createBucketRequest);
}
}

static class FlakyInputStream extends InputStream {
private final ResponseInputStream<GetObjectResponse> delegate;
private final AtomicInteger counter;
Expand Down
Loading