Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@
import software.amazon.awssdk.identity.spi.AwsSessionCredentialsIdentity;
import software.amazon.awssdk.identity.spi.IdentityProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3ServiceClientConfiguration;
import software.amazon.awssdk.services.s3.model.BucketAlreadyExistsException;
Expand Down Expand Up @@ -507,17 +506,13 @@ public void fileIOWithPrefixedS3ClientWithoutCredentialsSerialization(
io.initialize(Map.of(AwsClientProperties.CLIENT_REGION, "us-east-1"));

assertThat(io.client()).isInstanceOf(S3Client.class);
assertThat(io.asyncClient()).isInstanceOf(S3AsyncClient.class);
assertThat(io.client("s3a://my-bucket/my-path")).isInstanceOf(S3Client.class);
assertThat(io.asyncClient("s3a://my-bucket/my-path")).isInstanceOf(S3AsyncClient.class);

S3FileIO fileIO = roundTripSerializer.apply(io);
assertThat(fileIO.credentials()).isEqualTo(io.credentials()).isEmpty();

assertThat(fileIO.client()).isInstanceOf(S3Client.class);
assertThat(fileIO.asyncClient()).isInstanceOf(S3AsyncClient.class);
assertThat(fileIO.client("s3a://my-bucket/my-path")).isInstanceOf(S3Client.class);
assertThat(fileIO.asyncClient("s3a://my-bucket/my-path")).isInstanceOf(S3AsyncClient.class);
}

@ParameterizedTest
Expand All @@ -533,18 +528,14 @@ public void fileIOWithPrefixedS3ClientSerialization(

// there should be a client for the generic and specific storage prefix available
assertThat(io.client()).isInstanceOf(S3Client.class);
assertThat(io.asyncClient()).isInstanceOf(S3AsyncClient.class);
assertThat(io.client("s3://my-bucket/my-path")).isInstanceOf(S3Client.class);
assertThat(io.asyncClient("s3://my-bucket/my-path")).isInstanceOf(S3AsyncClient.class);

S3FileIO fileIO = roundTripSerializer.apply(io);
assertThat(fileIO.credentials()).isEqualTo(io.credentials());

// make sure there's a client for the generic and specific storage prefix available after ser/de
assertThat(fileIO.client()).isInstanceOf(S3Client.class);
assertThat(fileIO.asyncClient()).isInstanceOf(S3AsyncClient.class);
assertThat(fileIO.client("s3://my-bucket/my-path")).isInstanceOf(S3Client.class);
assertThat(fileIO.asyncClient("s3://my-bucket/my-path")).isInstanceOf(S3AsyncClient.class);
}

@Test
Expand Down Expand Up @@ -641,9 +632,6 @@ public void resolvingFileIOLoadWithoutStorageCredentials(
assertThat(fileIO.client("s3://foo/bar"))
.isSameAs(fileIO.client())
.isInstanceOf(S3Client.class);
assertThat(fileIO.asyncClient("s3://foo/bar"))
.isSameAs(fileIO.asyncClient())
.isInstanceOf(S3AsyncClient.class);
});

// make sure credentials can be accessed after serde
Expand All @@ -662,9 +650,6 @@ public void resolvingFileIOLoadWithoutStorageCredentials(
assertThat(fileIO.client("s3://foo/bar"))
.isSameAs(fileIO.client())
.isInstanceOf(S3Client.class);
assertThat(fileIO.asyncClient("s3://foo/bar"))
.isSameAs(fileIO.asyncClient())
.isInstanceOf(S3AsyncClient.class);
});
}

Expand Down Expand Up @@ -696,9 +681,6 @@ public void resolvingFileIOLoadWithStorageCredentials(
assertThat(fileIO.client("s3://foo/bar"))
.isNotSameAs(fileIO.client())
.isInstanceOf(S3Client.class);
assertThat(fileIO.asyncClient("s3://foo/bar"))
.isNotSameAs(fileIO.asyncClient())
.isInstanceOf(S3AsyncClient.class);
});

// make sure credentials are still present after serde
Expand All @@ -721,9 +703,6 @@ public void resolvingFileIOLoadWithStorageCredentials(
assertThat(fileIO.client("s3://foo/bar"))
.isNotSameAs(fileIO.client())
.isInstanceOf(S3Client.class);
assertThat(fileIO.asyncClient("s3://foo/bar"))
.isNotSameAs(fileIO.asyncClient())
.isInstanceOf(S3AsyncClient.class);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.glue.GlueClient;
import software.amazon.awssdk.services.kms.KmsClient;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3CrtAsyncClientBuilder;
import software.amazon.awssdk.services.sts.StsClient;
Expand All @@ -56,26 +54,6 @@ public S3Client s3() {
.build();
}

@Override
public S3AsyncClient s3Async() {
if (s3FileIOProperties.isS3CRTEnabled()) {
return S3AsyncClient.crtBuilder()
.applyMutation(this::applyAssumeRoleConfigurations)
.applyMutation(awsClientProperties::applyClientRegionConfiguration)
.applyMutation(awsClientProperties::applyClientCredentialConfigurations)
.applyMutation(s3FileIOProperties::applyEndpointConfigurations)
.applyMutation(s3FileIOProperties::applyS3CrtConfigurations)
.build();
}
return S3AsyncClient.builder()
.applyMutation(this::applyAssumeRoleConfigurations)
.applyMutation(awsClientProperties::applyClientRegionConfiguration)
.applyMutation(awsClientProperties::applyClientCredentialConfigurations)
.applyMutation(awsClientProperties::applyLegacyMd5Plugin)
.applyMutation(s3FileIOProperties::applyEndpointConfigurations)
.build();
}

@Override
public GlueClient glue() {
return GlueClient.builder()
Expand Down Expand Up @@ -125,12 +103,6 @@ protected <T extends AwsClientBuilder & AwsSyncClientBuilder> T applyAssumeRoleC
return clientBuilder;
}

protected S3AsyncClientBuilder applyAssumeRoleConfigurations(S3AsyncClientBuilder clientBuilder) {
return clientBuilder
.credentialsProvider(createCredentialsProvider())
.region(Region.of(awsProperties.clientAssumeRoleRegion()));
}

protected S3CrtAsyncClientBuilder applyAssumeRoleConfigurations(
S3CrtAsyncClientBuilder clientBuilder) {
return clientBuilder
Expand Down
21 changes: 0 additions & 21 deletions aws/src/main/java/org/apache/iceberg/aws/AwsClientFactories.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import software.amazon.awssdk.services.glue.GlueClient;
import software.amazon.awssdk.services.glue.GlueClientBuilder;
import software.amazon.awssdk.services.kms.KmsClient;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3BaseClientBuilder;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3Configuration;
Expand Down Expand Up @@ -120,26 +119,6 @@ public S3Client s3() {
.build();
}

@Override
public S3AsyncClient s3Async() {
if (s3FileIOProperties.isS3CRTEnabled()) {
return S3AsyncClient.crtBuilder()
.applyMutation(awsClientProperties::applyClientRegionConfiguration)
.applyMutation(
b -> s3FileIOProperties.applyCredentialConfigurations(awsClientProperties, b))
.applyMutation(s3FileIOProperties::applyEndpointConfigurations)
.applyMutation(s3FileIOProperties::applyS3CrtConfigurations)
.build();
}
return S3AsyncClient.builder()
.applyMutation(awsClientProperties::applyClientRegionConfiguration)
.applyMutation(awsClientProperties::applyLegacyMd5Plugin)
.applyMutation(
b -> s3FileIOProperties.applyCredentialConfigurations(awsClientProperties, b))
.applyMutation(s3FileIOProperties::applyEndpointConfigurations)
.build();
}

@Override
public GlueClient glue() {
return GlueClient.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.glue.GlueClient;
import software.amazon.awssdk.services.kms.KmsClient;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3Client;

/**
Expand All @@ -39,13 +38,6 @@ public interface AwsClientFactory extends Serializable {
*/
S3Client s3();

/**
* create a Amazon S3 async client
*
* @return s3 async client
*/
S3AsyncClient s3Async();

/**
* create a AWS Glue client
*
Expand Down
31 changes: 0 additions & 31 deletions aws/src/main/java/org/apache/iceberg/aws/AwsClientProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import software.amazon.awssdk.core.retry.RetryMode;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.LegacyMd5Plugin;
import software.amazon.awssdk.services.s3.S3CrtAsyncClientBuilder;

public class AwsClientProperties implements Serializable {
/**
Expand Down Expand Up @@ -148,21 +147,6 @@ void applyClientRegionConfiguration(BuilderT builder) {
}
}

/**
* Configure an S3 CRT client region.
*
* <p>Sample usage:
*
* <pre>
* S3AsyncClient.crtBuilder().applyMutation(awsClientProperties::applyClientRegionConfiguration)
* </pre>
*/
public <T extends S3CrtAsyncClientBuilder> void applyClientRegionConfiguration(T builder) {
if (clientRegion != null) {
builder.region(Region.of(clientRegion));
}
}

/**
* Configure the credential provider for AWS clients.
*
Expand All @@ -179,21 +163,6 @@ void applyClientCredentialConfigurations(BuilderT builder) {
}
}

/**
* Configure the credential provider for S3 CRT clients.
*
* <p>Sample usage:
*
* <pre>
* S3AsyncClient.crtBuilder().applyMutation(awsClientProperties::applyClientCredentialConfigurations)
* </pre>
*/
public <T extends S3CrtAsyncClientBuilder> void applyClientCredentialConfigurations(T builder) {
if (!Strings.isNullOrEmpty(this.clientCredentialsProvider)) {
builder.credentialsProvider(credentialsProvider(this.clientCredentialsProvider));
}
}

/**
* Returns a credentials provider instance. If {@link #refreshCredentialsEndpoint} is set, an
* instance of {@link VendedCredentialsProvider} is returned. If params were set, we return a new
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,31 +19,68 @@
package org.apache.iceberg.aws.s3;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.function.IntFunction;
import java.util.stream.Collectors;
import org.apache.iceberg.io.FileIOMetricsContext;
import org.apache.iceberg.io.FileRange;
import org.apache.iceberg.io.RangeReadable;
import org.apache.iceberg.io.SeekableInputStream;
import org.apache.iceberg.metrics.Counter;
import org.apache.iceberg.metrics.MetricsContext;
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream;
import software.amazon.s3.analyticsaccelerator.common.ObjectRange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** A wrapper to convert {@link S3SeekableInputStream} to Iceberg {@link SeekableInputStream} */
class AnalyticsAcceleratorInputStreamWrapper extends SeekableInputStream {
class AnalyticsAcceleratorInputStreamWrapper extends SeekableInputStream implements RangeReadable {
private static final Logger LOG =
LoggerFactory.getLogger(AnalyticsAcceleratorInputStreamWrapper.class);

private final S3SeekableInputStream delegate;
private final Counter readBytes;
private final Counter readOperations;

AnalyticsAcceleratorInputStreamWrapper(S3SeekableInputStream stream) {
this(stream, MetricsContext.nullMetrics());
}

AnalyticsAcceleratorInputStreamWrapper(S3SeekableInputStream stream, MetricsContext metrics) {
this.delegate = stream;
this.readBytes = metrics.counter(FileIOMetricsContext.READ_BYTES, MetricsContext.Unit.BYTES);
this.readOperations = metrics.counter(FileIOMetricsContext.READ_OPERATIONS);
}

@Override
public int read() throws IOException {
return this.delegate.read();
int nextByteValue = this.delegate.read();
if (nextByteValue != -1) {
readBytes.increment();
}
readOperations.increment();
return nextByteValue;
}

@Override
public int read(byte[] b) throws IOException {
return this.delegate.read(b, 0, b.length);
public int read(byte[] b, int off, int len) throws IOException {
int bytesRead = this.delegate.read(b, off, len);
if (bytesRead > 0) {
readBytes.increment(bytesRead);
}
readOperations.increment();
return bytesRead;
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
return this.delegate.read(b, off, len);
public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
this.delegate.readFully(position, buffer, offset, length);
}

@Override
public int readTail(byte[] buffer, int offset, int length) throws IOException {
return this.delegate.readTail(buffer, offset, length);
}

@Override
Expand All @@ -56,6 +93,21 @@ public long getPos() {
return this.delegate.getPos();
}

@Override
public void readVectored(List<FileRange> ranges, IntFunction<ByteBuffer> allocate) throws IOException {
List<ObjectRange> objectRanges = ranges.stream()
.map(
range ->
new software.amazon.s3.analyticsaccelerator.common.ObjectRange(
range.byteBuffer(), range.offset(), range.length()))
.collect(Collectors.toList());

// This does not keep track of bytes read and that is a possible improvement.
readOperations.increment();
this.delegate.readVectored(objectRanges, allocate,
buffer -> LOG.info("Release buffer of length {}: {}", buffer.limit(), buffer));
}

@Override
public void close() throws IOException {
this.delegate.close();
Expand Down
Loading
Loading