Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
import java.util.UUID;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.aws.AwsClientUtil;
import org.apache.iceberg.aws.AwsClientFactories;
import org.apache.iceberg.aws.AwsClientFactory;
import org.apache.iceberg.aws.AwsIntegTestUtil;
import org.apache.iceberg.aws.AwsProperties;
import org.apache.iceberg.aws.s3.S3FileIO;
Expand Down Expand Up @@ -51,8 +52,9 @@ public class GlueTestBase {
static final List<String> namespaces = Lists.newArrayList();

// aws clients
static final GlueClient glue = AwsClientUtil.defaultGlueClient();
static final S3Client s3 = AwsClientUtil.defaultS3Client();
static final AwsClientFactory clientFactory = AwsClientFactories.defaultFactory();
static final GlueClient glue = clientFactory.glue();
static final S3Client s3 = clientFactory.s3();

// iceberg
static GlueCatalog glueCatalog;
Expand All @@ -64,13 +66,13 @@ public class GlueTestBase {
@BeforeClass
public static void beforeClass() {
String testBucketPath = "s3://" + testBucketName + "/" + testPathPrefix;
S3FileIO fileIO = new S3FileIO();
glueCatalog = new GlueCatalog(glue);
glueCatalog.initialize(catalogName, testBucketPath, new AwsProperties(), fileIO);
S3FileIO fileIO = new S3FileIO(clientFactory::s3);
glueCatalog = new GlueCatalog();
glueCatalog.initialize(catalogName, testBucketPath, new AwsProperties(), glue, fileIO);
AwsProperties properties = new AwsProperties();
properties.setGlueCatalogSkipArchive(true);
glueCatalogWithSkip = new GlueCatalog(glue);
glueCatalogWithSkip.initialize(catalogName, testBucketPath, properties, fileIO);
glueCatalogWithSkip = new GlueCatalog();
glueCatalogWithSkip.initialize(catalogName, testBucketPath, properties, glue, fileIO);
}

@AfterClass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,14 @@
import java.util.UUID;
import javax.crypto.KeyGenerator;
import javax.crypto.SecretKey;
import org.apache.iceberg.aws.AwsClientUtil;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.iceberg.aws.AwsClientFactories;
import org.apache.iceberg.aws.AwsClientFactory;
import org.apache.iceberg.aws.AwsIntegTestUtil;
import org.apache.iceberg.aws.AwsProperties;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
Expand All @@ -57,6 +60,7 @@

public class S3FileIOTest {

private static AwsClientFactory clientFactory;
private static S3Client s3;
private static KmsClient kms;
private static String bucketName;
Expand All @@ -69,8 +73,9 @@ public class S3FileIOTest {

@BeforeClass
public static void beforeClass() {
s3 = AwsClientUtil.defaultS3Client();
kms = AwsClientUtil.defaultKmsClient();
clientFactory = AwsClientFactories.defaultFactory();
s3 = clientFactory.s3();
kms = clientFactory.kms();
bucketName = AwsIntegTestUtil.testBucketName();
prefix = UUID.randomUUID().toString();
contentBytes = new byte[1024 * 1024 * 10];
Expand All @@ -94,13 +99,13 @@ public void before() {
public void testNewInputStream() throws Exception {
s3.putObject(PutObjectRequest.builder().bucket(bucketName).key(objectKey).build(),
RequestBody.fromBytes(contentBytes));
S3FileIO s3FileIO = new S3FileIO();
S3FileIO s3FileIO = new S3FileIO(clientFactory::s3);
validateRead(s3FileIO);
}

@Test
public void testNewOutputStream() throws Exception {
S3FileIO s3FileIO = new S3FileIO();
S3FileIO s3FileIO = new S3FileIO(clientFactory::s3);
write(s3FileIO);
InputStream stream = s3.getObject(GetObjectRequest.builder().bucket(bucketName).key(objectKey).build());
String result = IoUtils.toUtf8String(stream);
Expand All @@ -112,7 +117,7 @@ public void testNewOutputStream() throws Exception {
public void testSSE_S3() throws Exception {
AwsProperties properties = new AwsProperties();
properties.setS3FileIoSseType(AwsProperties.S3FILEIO_SSE_TYPE_S3);
S3FileIO s3FileIO = new S3FileIO(AwsClientUtil::defaultS3Client, properties);
S3FileIO s3FileIO = new S3FileIO(clientFactory::s3, properties);
write(s3FileIO);
validateRead(s3FileIO);
GetObjectResponse response = s3.getObject(
Expand All @@ -125,7 +130,7 @@ public void testSSE_KMS() throws Exception {
AwsProperties properties = new AwsProperties();
properties.setS3FileIoSseType(AwsProperties.S3FILEIO_SSE_TYPE_KMS);
properties.setS3FileIoSseKey(kmsKeyArn);
S3FileIO s3FileIO = new S3FileIO(AwsClientUtil::defaultS3Client, properties);
S3FileIO s3FileIO = new S3FileIO(clientFactory::s3, properties);
write(s3FileIO);
validateRead(s3FileIO);
GetObjectResponse response = s3.getObject(
Expand All @@ -138,7 +143,7 @@ public void testSSE_KMS() throws Exception {
public void testSSE_KMS_default() throws Exception {
AwsProperties properties = new AwsProperties();
properties.setS3FileIoSseType(AwsProperties.S3FILEIO_SSE_TYPE_KMS);
S3FileIO s3FileIO = new S3FileIO(AwsClientUtil::defaultS3Client, properties);
S3FileIO s3FileIO = new S3FileIO(clientFactory::s3, properties);
write(s3FileIO);
validateRead(s3FileIO);
GetObjectResponse response = s3.getObject(
Expand Down Expand Up @@ -167,7 +172,7 @@ public void testSSE_Custom() throws Exception {
properties.setS3FileIoSseType(AwsProperties.S3FILEIO_SSE_TYPE_CUSTOM);
properties.setS3FileIoSseKey(encodedKey);
properties.setS3FileIoSseMd5(md5);
S3FileIO s3FileIO = new S3FileIO(AwsClientUtil::defaultS3Client, properties);
S3FileIO s3FileIO = new S3FileIO(clientFactory::s3, properties);
write(s3FileIO);
validateRead(s3FileIO);
GetObjectResponse response = s3.getObject(
Expand All @@ -185,7 +190,7 @@ public void testSSE_Custom() throws Exception {
public void testACL() throws Exception {
AwsProperties properties = new AwsProperties();
properties.setS3FileIoAcl(ObjectCannedACL.BUCKET_OWNER_FULL_CONTROL);
S3FileIO s3FileIO = new S3FileIO(AwsClientUtil::defaultS3Client, properties);
S3FileIO s3FileIO = new S3FileIO(clientFactory::s3, properties);
write(s3FileIO);
validateRead(s3FileIO);
GetObjectAclResponse response = s3.getObjectAcl(
Expand All @@ -195,6 +200,16 @@ public void testACL() throws Exception {
Assert.assertEquals(Permission.FULL_CONTROL, response.grants().get(0).permission());
}

@Test
public void testClientFactorySerialization() throws Exception {
S3FileIO fileIO = new S3FileIO();
fileIO.initialize(Maps.newHashMap());
write(fileIO);
byte [] data = SerializationUtils.serialize(fileIO);
S3FileIO fileIO2 = SerializationUtils.deserialize(data);
validateRead(fileIO2);
}

private void write(S3FileIO s3FileIO) throws Exception {
OutputFile outputFile = s3FileIO.newOutputFile(objectUri);
OutputStream outputStream = outputFile.create();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.util.UUID;
import java.util.function.Supplier;
import java.util.stream.IntStream;
import org.apache.iceberg.aws.AwsClientUtil;
import org.apache.iceberg.aws.AwsClientFactories;
import org.apache.iceberg.aws.AwsIntegTestUtil;
import org.apache.iceberg.aws.AwsProperties;
import org.apache.iceberg.io.PositionOutputStream;
Expand All @@ -51,7 +51,7 @@ public class S3MultipartUploadTest {

@BeforeClass
public static void beforeClass() {
s3 = AwsClientUtil.defaultS3Client();
s3 = AwsClientFactories.defaultFactory().s3();
bucketName = AwsIntegTestUtil.testBucketName();
prefix = UUID.randomUUID().toString();
properties = new AwsProperties();
Expand Down
102 changes: 102 additions & 0 deletions aws/src/main/java/org/apache/iceberg/aws/AwsClientFactories.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.aws;

import java.util.Map;
import org.apache.iceberg.common.DynConstructors;
import org.apache.iceberg.io.FileIO;
import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
import software.amazon.awssdk.services.glue.GlueClient;
import software.amazon.awssdk.services.kms.KmsClient;
import software.amazon.awssdk.services.s3.S3Client;

public class AwsClientFactories {

/**
* The implementation class of {@link AwsClientFactory} to customize AWS client configurations.
* If set, all AWS clients will be configured by the specified class before initialization.
*/
public static final String CLIENT_FACTORY_CONFIG_KEY = "client.factory";

private static final SdkHttpClient HTTP_CLIENT_DEFAULT = UrlConnectionHttpClient.create();
private static final DefaultAwsClientFactory AWS_CLIENT_FACTORY_DEFAULT = new DefaultAwsClientFactory();

private AwsClientFactories() {
}

public static AwsClientFactory defaultFactory() {
return AWS_CLIENT_FACTORY_DEFAULT;
}

public static AwsClientFactory from(Map<String, String> properties) {
if (properties.containsKey(CLIENT_FACTORY_CONFIG_KEY)) {
return loadClientFactory(properties.get(CLIENT_FACTORY_CONFIG_KEY), properties);
} else {
return defaultFactory();
}
}

private static AwsClientFactory loadClientFactory(String impl, Map<String, String> properties) {
DynConstructors.Ctor<AwsClientFactory> ctor;
try {
ctor = DynConstructors.builder(FileIO.class).impl(impl).buildChecked();
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException(String.format(
"Cannot initialize AwsClientFactory, missing no-arg constructor: %s", impl), e);
}

AwsClientFactory factory;
try {
factory = ctor.newInstance();
} catch (ClassCastException e) {
throw new IllegalArgumentException(
String.format("Cannot initialize AwsClientFactory, %s does not implement AwsClientFactory.", impl), e);
}

factory.initialize(properties);
return factory;
}

static class DefaultAwsClientFactory implements AwsClientFactory {

DefaultAwsClientFactory() {
}

@Override
public S3Client s3() {
return S3Client.builder().httpClient(HTTP_CLIENT_DEFAULT).build();
}

@Override
public GlueClient glue() {
return GlueClient.builder().httpClient(HTTP_CLIENT_DEFAULT).build();
}

@Override
public KmsClient kms() {
return KmsClient.builder().httpClient(HTTP_CLIENT_DEFAULT).build();
}

@Override
public void initialize(Map<String, String> properties) {
}
}
}
58 changes: 58 additions & 0 deletions aws/src/main/java/org/apache/iceberg/aws/AwsClientFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.aws;

import java.io.Serializable;
import java.util.Map;
import software.amazon.awssdk.services.glue.GlueClient;
import software.amazon.awssdk.services.kms.KmsClient;
import software.amazon.awssdk.services.s3.S3Client;

/**
* Interface to customize AWS clients used by Iceberg.
* A custom factory must have a no-arg constructor, and use {@link #initialize(Map)} to initialize the factory.
*/
public interface AwsClientFactory extends Serializable {

/**
* create a Amazon S3 client
* @return s3 client
*/
S3Client s3();

/**
* create a AWS Glue client
* @return glue client
*/
GlueClient glue();

/**
* Create a AWS KMS client
* @return kms client
*/
KmsClient kms();

/**
* Initialize AWS client factory from catalog properties.
* @param properties catalog properties
*/
void initialize(Map<String, String> properties);

}
57 changes: 0 additions & 57 deletions aws/src/main/java/org/apache/iceberg/aws/AwsClientUtil.java

This file was deleted.

Loading