Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import software.amazon.awssdk.services.s3control.S3ControlClient;
import software.amazon.awssdk.utils.ImmutableMap;
import software.amazon.awssdk.utils.IoUtils;
import software.amazon.s3.analyticsaccelerator.util.PrefetchMode;

public class TestS3FileIOIntegration {

Expand Down Expand Up @@ -255,6 +256,48 @@ public void testNewInputStreamWithMultiRegionAccessPoint() throws Exception {
validateRead(s3FileIO);
}

@Test
public void testNewInputStreamWithAnalyticsAccelerator() throws Exception {
s3.putObject(
PutObjectRequest.builder().bucket(bucketName).key(objectKey).build(),
RequestBody.fromBytes(contentBytes));
S3FileIO s3FileIO = new S3FileIO();
s3FileIO.initialize(
ImmutableMap.of(S3FileIOProperties.S3_ANALYTICS_ACCELERATOR_ENABLED, String.valueOf(true)));
validateRead(s3FileIO);
}

@Test
public void testNewInputStreamWithAnalyticsAcceleratorAndCRT() throws Exception {
s3.putObject(
PutObjectRequest.builder().bucket(bucketName).key(objectKey).build(),
RequestBody.fromBytes(contentBytes));
S3FileIO s3FileIO = new S3FileIO();
s3FileIO.initialize(
ImmutableMap.of(
S3FileIOProperties.S3_ANALYTICS_ACCELERATOR_ENABLED,
String.valueOf(true),
S3FileIOProperties.S3_CRT_ENABLED,
String.valueOf(true)));
validateRead(s3FileIO);
}

@Test
public void testNewInputStreamWithAnalyticsAcceleratorCustomConfigured() throws Exception {
final String prefetchingMode = "logicalio.prefetching.mode";
final String s3Uri = String.format("s3://%s/%s/%s.parquet", bucketName, prefix, objectKey);
S3FileIO s3FileIO = new S3FileIO();
s3FileIO.initialize(
ImmutableMap.of(
S3FileIOProperties.S3_ANALYTICS_ACCELERATOR_ENABLED,
String.valueOf(true),
S3FileIOProperties.S3_ANALYTICS_ACCELERATOR_PROPERTIES_PREFIX + prefetchingMode,
PrefetchMode.ALL.name()));
write(s3FileIO, s3Uri);
validateRead(s3FileIO, s3Uri);
s3FileIO.deleteFile(s3Uri);
}

@Test
public void testNewOutputStream() throws Exception {
S3FileIO s3FileIO = new S3FileIO(clientFactory::s3);
Expand Down Expand Up @@ -324,6 +367,19 @@ public void testNewOutputStreamWithMultiRegionAccessPoint() throws Exception {
}
}

@Test
public void testNewOutputStreamWithAnalyticsAccelerator() throws Exception {
S3FileIO s3FileIO = new S3FileIO();
s3FileIO.initialize(
ImmutableMap.of(S3FileIOProperties.S3_ANALYTICS_ACCELERATOR_ENABLED, String.valueOf(true)));
write(s3FileIO);
try (InputStream stream =
s3.getObject(GetObjectRequest.builder().bucket(bucketName).key(objectKey).build())) {
String result = IoUtils.toUtf8String(stream);
assertThat(result).isEqualTo(content);
}
}

@Test
public void testServerSideS3Encryption() throws Exception {
S3FileIOProperties properties = new S3FileIOProperties();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.glue.GlueClient;
import software.amazon.awssdk.services.kms.KmsClient;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3CrtAsyncClientBuilder;
import software.amazon.awssdk.services.sts.StsClient;
import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
Expand All @@ -52,6 +55,14 @@ public S3Client s3() {
.build();
}

@Override
public S3AsyncClient s3Async() {
if (s3FileIOProperties.isS3CRTEnabled()) {
return S3AsyncClient.crtBuilder().applyMutation(this::applyAssumeRoleConfigurations).build();
}
return S3AsyncClient.builder().applyMutation(this::applyAssumeRoleConfigurations).build();
}

@Override
public GlueClient glue() {
return GlueClient.builder()
Expand Down Expand Up @@ -95,24 +106,25 @@ public void initialize(Map<String, String> properties) {

protected <T extends AwsClientBuilder & AwsSyncClientBuilder> T applyAssumeRoleConfigurations(
T clientBuilder) {
AssumeRoleRequest assumeRoleRequest =
AssumeRoleRequest.builder()
.roleArn(awsProperties.clientAssumeRoleArn())
.roleSessionName(roleSessionName)
.durationSeconds(awsProperties.clientAssumeRoleTimeoutSec())
.externalId(awsProperties.clientAssumeRoleExternalId())
.tags(awsProperties.stsClientAssumeRoleTags())
.build();
clientBuilder
.credentialsProvider(
StsAssumeRoleCredentialsProvider.builder()
.stsClient(sts())
.refreshRequest(assumeRoleRequest)
.build())
.credentialsProvider(createCredentialsProvider())
.region(Region.of(awsProperties.clientAssumeRoleRegion()));
return clientBuilder;
}

protected S3AsyncClientBuilder applyAssumeRoleConfigurations(S3AsyncClientBuilder clientBuilder) {
return clientBuilder
.credentialsProvider(createCredentialsProvider())
.region(Region.of(awsProperties.clientAssumeRoleRegion()));
}

protected S3CrtAsyncClientBuilder applyAssumeRoleConfigurations(
S3CrtAsyncClientBuilder clientBuilder) {
return clientBuilder
.credentialsProvider(createCredentialsProvider())
.region(Region.of(awsProperties.clientAssumeRoleRegion()));
}

protected String region() {
return awsProperties.clientAssumeRoleRegion();
}
Expand Down Expand Up @@ -145,4 +157,21 @@ private String genSessionName() {
}
return String.format("iceberg-aws-%s", UUID.randomUUID());
}

private StsAssumeRoleCredentialsProvider createCredentialsProvider() {
return StsAssumeRoleCredentialsProvider.builder()
.stsClient(sts())
.refreshRequest(createAssumeRoleRequest())
.build();
}

private AssumeRoleRequest createAssumeRoleRequest() {
return AssumeRoleRequest.builder()
.roleArn(awsProperties.clientAssumeRoleArn())
.roleSessionName(roleSessionName)
.durationSeconds(awsProperties.clientAssumeRoleTimeoutSec())
.externalId(awsProperties.clientAssumeRoleExternalId())
.tags(awsProperties.stsClientAssumeRoleTags())
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import software.amazon.awssdk.services.glue.GlueClient;
import software.amazon.awssdk.services.glue.GlueClientBuilder;
import software.amazon.awssdk.services.kms.KmsClient;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3ClientBuilder;
import software.amazon.awssdk.services.s3.S3Configuration;
Expand Down Expand Up @@ -118,6 +119,14 @@ public S3Client s3() {
.build();
}

@Override
public S3AsyncClient s3Async() {
if (s3FileIOProperties.isS3CRTEnabled()) {
return S3AsyncClient.crtBuilder().build();
}
return S3AsyncClient.builder().build();
}

@Override
public GlueClient glue() {
return GlueClient.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.glue.GlueClient;
import software.amazon.awssdk.services.kms.KmsClient;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3Client;

/**
Expand All @@ -38,6 +39,13 @@ public interface AwsClientFactory extends Serializable {
*/
S3Client s3();

/**
* create a Amazon S3 async client
*
* @return s3 async client
*/
S3AsyncClient s3Async();

/**
* create a AWS Glue client
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.aws.s3;

import java.io.IOException;
import org.apache.iceberg.io.SeekableInputStream;
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream;

/** A wrapper to convert {@link S3SeekableInputStream} to Iceberg {@link SeekableInputStream} */
class AnalyticsAcceleratorInputStreamWrapper extends SeekableInputStream {

private final S3SeekableInputStream delegate;

AnalyticsAcceleratorInputStreamWrapper(S3SeekableInputStream stream) {
this.delegate = stream;
}

@Override
public int read() throws IOException {
return this.delegate.read();
}

@Override
public int read(byte[] b) throws IOException {
return this.delegate.read(b, 0, b.length);
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
return this.delegate.read(b, off, len);
}

@Override
public void seek(long l) throws IOException {
this.delegate.seek(l);
}

@Override
public long getPos() {
return this.delegate.getPos();
}

@Override
public void close() throws IOException {
this.delegate.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.aws.s3;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import java.io.IOException;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.SeekableInputStream;
import org.apache.iceberg.util.Pair;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
import software.amazon.s3.analyticsaccelerator.ObjectClientConfiguration;
import software.amazon.s3.analyticsaccelerator.S3SdkObjectClient;
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream;
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory;
import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration;
import software.amazon.s3.analyticsaccelerator.request.ObjectClient;
import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata;
import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation;
import software.amazon.s3.analyticsaccelerator.util.S3URI;

class AnalyticsAcceleratorUtil {

private static final Cache<Pair<S3AsyncClient, S3FileIOProperties>, S3SeekableInputStreamFactory>
STREAM_FACTORY_CACHE = Caffeine.newBuilder().maximumSize(100).build();

private AnalyticsAcceleratorUtil() {}

public static SeekableInputStream newStream(S3InputFile inputFile) {
S3URI uri = S3URI.of(inputFile.uri().bucket(), inputFile.uri().key());
HeadObjectResponse metadata = inputFile.getObjectMetadata();
OpenStreamInformation openStreamInfo =
OpenStreamInformation.builder()
.objectMetadata(
ObjectMetadata.builder()
.contentLength(metadata.contentLength())
.etag(metadata.eTag())
.build())
.build();

S3SeekableInputStreamFactory factory =
STREAM_FACTORY_CACHE.get(
Pair.of(inputFile.asyncClient(), inputFile.s3FileIOProperties()),
AnalyticsAcceleratorUtil::createNewFactory);

try {
S3SeekableInputStream seekableInputStream = factory.createStream(uri, openStreamInfo);
return new AnalyticsAcceleratorInputStreamWrapper(seekableInputStream);
} catch (IOException e) {
throw new RuntimeIOException(
e, "Failed to create S3 analytics accelerator input stream for: %s", inputFile.uri());
}
}

private static S3SeekableInputStreamFactory createNewFactory(
Pair<S3AsyncClient, S3FileIOProperties> cacheKey) {
ConnectorConfiguration connectorConfiguration =
new ConnectorConfiguration(cacheKey.second().s3AnalyticsacceleratorProperties());
S3SeekableInputStreamConfiguration streamConfiguration =
S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration);
ObjectClientConfiguration objectClientConfiguration =
ObjectClientConfiguration.fromConfiguration(connectorConfiguration);

ObjectClient objectClient = new S3SdkObjectClient(cacheKey.first(), objectClientConfiguration);
return new S3SeekableInputStreamFactory(objectClient, streamConfiguration);
}

public static void cleanupCache(
S3AsyncClient asyncClient, S3FileIOProperties s3FileIOProperties) {
STREAM_FACTORY_CACHE.invalidate(Pair.of(asyncClient, s3FileIOProperties));
}
}
Loading