From ec5dc8e0db2a8773a334a22f29f40b99df042cdb Mon Sep 17 00:00:00 2001 From: mikewu Date: Mon, 15 Nov 2021 13:48:31 +0800 Subject: [PATCH 1/8] Aliyun: Add OSSFileIO --- .../iceberg/aliyun/AliyunClientFactory.java | 79 +++++++++ .../iceberg/aliyun/AliyunProperties.java | 48 +++++ .../aliyun/DefaultAliyunClientFactory.java | 48 +++++ .../apache/iceberg/aliyun/oss/OSSFileIO.java | 105 +++++++++++ .../iceberg/aliyun/oss/AliyunOSSTestBase.java | 11 ++ .../iceberg/aliyun/oss/TestOSSFileIO.java | 167 ++++++++++++++++++ 6 files changed, 458 insertions(+) create mode 100644 aliyun/src/main/java/org/apache/iceberg/aliyun/AliyunClientFactory.java create mode 100644 aliyun/src/main/java/org/apache/iceberg/aliyun/DefaultAliyunClientFactory.java create mode 100644 aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSFileIO.java create mode 100644 aliyun/src/test/java/org/apache/iceberg/aliyun/oss/TestOSSFileIO.java diff --git a/aliyun/src/main/java/org/apache/iceberg/aliyun/AliyunClientFactory.java b/aliyun/src/main/java/org/apache/iceberg/aliyun/AliyunClientFactory.java new file mode 100644 index 000000000000..a283d5cce3df --- /dev/null +++ b/aliyun/src/main/java/org/apache/iceberg/aliyun/AliyunClientFactory.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aliyun; + +import com.aliyun.oss.OSS; +import java.io.Serializable; +import java.util.Map; +import org.apache.iceberg.common.DynConstructors; + +public interface AliyunClientFactory extends Serializable { + /** + * Create an aliyun OSS client. + * + * @return oss client. + */ + OSS oss(); + + /** + * Initialize Aliyun client factory from catalog properties. + * + * @param properties catalog properties + */ + void initialize(Map properties); + + /** + * Returns an initialized {@link AliyunProperties} + */ + AliyunProperties aliyunProperties(); + + static AliyunClientFactory load(Map properties) { + String impl = properties.getOrDefault("client.factory", DefaultAliyunClientFactory.class.getName()); + return load(impl, properties); + } + + /** + * Load an implemented {@link AliyunClientFactory} based on the class name, and initialize it. + * + * @param impl the class name. + * @param properties to initialize the factory. + * @return an initialized {@link AliyunClientFactory}. + */ + static AliyunClientFactory load(String impl, Map properties) { + DynConstructors.Ctor ctor; + try { + ctor = DynConstructors.builder(AliyunClientFactory.class).impl(impl).buildChecked(); + } catch (NoSuchMethodException e) { + throw new IllegalArgumentException(String.format( + "Cannot initialize AliyunClientFactory, missing no-arg constructor: %s", impl), e); + } + + AliyunClientFactory factory; + try { + factory = ctor.newInstance(); + } catch (ClassCastException e) { + throw new IllegalArgumentException( + String.format("Cannot initialize AliyunClientFactory, %s does not implement AliyunClientFactory.", impl), e); + } + + factory.initialize(properties); + return factory; + } +} diff --git a/aliyun/src/main/java/org/apache/iceberg/aliyun/AliyunProperties.java b/aliyun/src/main/java/org/apache/iceberg/aliyun/AliyunProperties.java index e63940281d01..40ddc22b8c87 100644 --- a/aliyun/src/main/java/org/apache/iceberg/aliyun/AliyunProperties.java +++ b/aliyun/src/main/java/org/apache/iceberg/aliyun/AliyunProperties.java @@ -25,10 +25,41 @@ import org.apache.iceberg.util.PropertyUtil; public class AliyunProperties implements Serializable { + /** + * The domain name used to access OSS. OSS uses HTTP Restful APIs to provide services. Different regions are accessed + * by using different endpoints. For the same region, access over the internal network or over the Internet also uses + * different endpoints. For more information, see: + * https://www.alibabacloud.com/help/doc-detail/31837.htm + */ + public static final String OSS_ENDPOINT = "oss.endpoint"; + + /** + * Aliyun uses an AccessKey pair, which includes an AccessKey ID and an AccessKey secret to implement symmetric + * encryption and verify the identity of a requester. The AccessKey ID is used to identify a user. + *

+ * For more information about how to obtain an AccessKey pair, see: + * https://www.alibabacloud.com/help/doc-detail/53045.htm + */ + public static final String ACCESS_KEY_ID = "access.key.id"; + + /** + * Aliyun uses an AccessKey pair, which includes an AccessKey ID and an AccessKey secret to implement symmetric + * encryption and verify the identity of a requester. The AccessKey secret is used to encrypt and verify the + * signature string. + *

+ * For more information about how to obtain an AccessKey pair, see: + * https://www.alibabacloud.com/help/doc-detail/53045.htm + */ + public static final String ACCESS_KEY_SECRET = "access.key.secret"; + /** * Location to put staging files for uploading to OSS, defaults to the directory value of java.io.tmpdir. */ public static final String OSS_STAGING_DIRECTORY = "oss.staging-dir"; + + private final String ossEndpoint; + private final String accessKeyId; + private final String accessKeySecret; private final String ossStagingDirectory; public AliyunProperties() { @@ -36,10 +67,27 @@ public AliyunProperties() { } public AliyunProperties(Map properties) { + // OSS endpoint, accessKeyId, accessKeySecret. + this.ossEndpoint = properties.get(OSS_ENDPOINT); + this.accessKeyId = properties.get(ACCESS_KEY_ID); + this.accessKeySecret = properties.get(ACCESS_KEY_SECRET); + this.ossStagingDirectory = PropertyUtil.propertyAsString(properties, OSS_STAGING_DIRECTORY, System.getProperty("java.io.tmpdir")); } + public String ossEndpoint() { + return ossEndpoint; + } + + public String accessKeyId() { + return accessKeyId; + } + + public String accessKeySecret() { + return accessKeySecret; + } + public String ossStagingDirectory() { return ossStagingDirectory; } diff --git a/aliyun/src/main/java/org/apache/iceberg/aliyun/DefaultAliyunClientFactory.java b/aliyun/src/main/java/org/apache/iceberg/aliyun/DefaultAliyunClientFactory.java new file mode 100644 index 000000000000..2d862de10d7e --- /dev/null +++ b/aliyun/src/main/java/org/apache/iceberg/aliyun/DefaultAliyunClientFactory.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aliyun; + +import com.aliyun.oss.OSS; +import com.aliyun.oss.OSSClientBuilder; +import java.util.Map; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +public class DefaultAliyunClientFactory implements AliyunClientFactory { + private AliyunProperties aliyunProperties; + + @Override + public OSS oss() { + Preconditions.checkNotNull(aliyunProperties, + "Cannot create aliyun oss client before initializing the AliyunClientFactory."); + + return new OSSClientBuilder().build( + aliyunProperties.ossEndpoint(), aliyunProperties.accessKeyId(), aliyunProperties.accessKeySecret()); + } + + @Override + public void initialize(Map properties) { + this.aliyunProperties = new AliyunProperties(properties); + } + + @Override + public AliyunProperties aliyunProperties() { + return aliyunProperties; + } +} diff --git a/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSFileIO.java b/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSFileIO.java new file mode 100644 index 000000000000..60fe8640b1e2 --- /dev/null +++ b/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSFileIO.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aliyun.oss; + +import com.aliyun.oss.OSS; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.iceberg.aliyun.AliyunClientFactory; +import org.apache.iceberg.aliyun.AliyunProperties; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.util.SerializableSupplier; + +/** + * FileIO implementation backend by OSS. + *

+ * Locations used must follow the conventions for OSS URIs (e.g. oss://bucket/path...). + * URIs with scheme https are also treated as oss file paths. + * Using this FileIO with other schemes with result in {@link org.apache.iceberg.exceptions.ValidationException} + */ +public class OSSFileIO implements FileIO { + + private SerializableSupplier oss; + private AliyunProperties aliyunProperties; + private transient OSS client; + private final AtomicBoolean isResourceClosed = new AtomicBoolean(false); + + /** + * No-arg constructor to load the FileIO dynamically. + *

+ * All fields are initialized by calling {@link OSSFileIO#initialize(Map)} later. + */ + public OSSFileIO() { + } + + /** + * Constructor with custom oss supplier and default aliyun properties. + *

+ * Calling {@link OSSFileIO#initialize(Map)} will overwrite information set in this constructor. + * + * @param oss oss supplier + */ + public OSSFileIO(SerializableSupplier oss) { + this.oss = oss; + this.aliyunProperties = new AliyunProperties(); + } + + @Override + public InputFile newInputFile(String path) { + return new OSSInputFile(client(), new OSSURI(path)); + } + + @Override + public OutputFile newOutputFile(String path) { + return new OSSOutputFile(client(), new OSSURI(path), aliyunProperties); + } + + @Override + public void deleteFile(String path) { + OSSURI location = new OSSURI(path); + client().deleteObject(location.bucket(), location.key()); + } + + private OSS client() { + if (client == null) { + client = oss.get(); + } + return client; + } + + @Override + public void initialize(Map properties) { + AliyunClientFactory factory = AliyunClientFactory.load(properties); + this.aliyunProperties = factory.aliyunProperties(); + this.oss = factory::oss; + } + + @Override + public void close() { + // handles concurrent calls to close() + if (isResourceClosed.compareAndSet(false, true)) { + if (client != null) { + client.shutdown(); + } + } + } +} diff --git a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/AliyunOSSTestBase.java b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/AliyunOSSTestBase.java index 24e89bf8c2ef..284bddd740a6 100644 --- a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/AliyunOSSTestBase.java +++ b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/AliyunOSSTestBase.java @@ -20,6 +20,7 @@ package org.apache.iceberg.aliyun.oss; import com.aliyun.oss.OSS; +import org.apache.iceberg.io.FileIO; import org.apache.iceberg.util.SerializableSupplier; import org.junit.After; import org.junit.Before; @@ -33,14 +34,20 @@ public abstract class AliyunOSSTestBase { private final String bucketName = OSS_TEST_RULE.testBucketName(); private final String keyPrefix = OSS_TEST_RULE.keyPrefix(); + private OSSFileIO fileIO; + @Before public void before() { + fileIO = new OSSFileIO(ossClient); + OSS_TEST_RULE.setUpBucket(bucketName); } @After public void after() { OSS_TEST_RULE.tearDownBucket(bucketName); + + fileIO.close(); } protected String location(String key) { @@ -50,4 +57,8 @@ protected String location(String key) { protected SerializableSupplier ossClient() { return ossClient; } + + protected FileIO fileIO() { + return fileIO; + } } diff --git a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/TestOSSFileIO.java b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/TestOSSFileIO.java new file mode 100644 index 000000000000..278f67de55af --- /dev/null +++ b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/TestOSSFileIO.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aliyun.oss; + +import com.aliyun.oss.OSS; +import com.aliyun.oss.OSSClient; +import com.aliyun.oss.OSSClientBuilder; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.io.ByteStreams; +import org.apache.iceberg.util.SerializableSupplier; +import org.apache.iceberg.util.SerializationUtil; +import org.junit.Assert; +import org.junit.Test; + +public class TestOSSFileIO extends AliyunOSSTestBase { + private static final String OSS_IMPL_CLASS = OSSFileIO.class.getName(); + + private final OSS ossClient = ossClient().get(); + private final Random random = ThreadLocalRandom.current(); + private final Configuration conf = new Configuration(); + + @Test + public void testOutputFile() throws IOException { + String location = randomLocation(); + int dataSize = 1024 * 10; + byte[] data = randomData(dataSize); + + OutputFile out = fileIO().newOutputFile(location); + try (OutputStream os = out.create(); InputStream is = new ByteArrayInputStream(data)) { + ByteStreams.copy(is, os); + } + + OSSURI uri = new OSSURI(location); + Assert.assertTrue("OSS file should exist", ossClient.doesObjectExist(uri.bucket(), uri.key())); + Assert.assertEquals("Should have expected location", location, out.location()); + Assert.assertEquals("Should have expected length", ossDataLength(uri), dataSize); + Assert.assertArrayEquals("Should have expected content", data, ossDataContent(uri, dataSize)); + } + + @Test + public void testInputFile() throws IOException { + String location = randomLocation(); + InputFile in = fileIO().newInputFile(location); + Assert.assertFalse("OSS file should not exist", in.exists()); + + int dataSize = 1024 * 10; + byte[] data = randomData(dataSize); + OutputFile out = fileIO().newOutputFile(location); + try (OutputStream os = out.createOrOverwrite(); InputStream is = new ByteArrayInputStream(data)) { + ByteStreams.copy(is, os); + } + + Assert.assertTrue("OSS file should exist", in.exists()); + Assert.assertEquals("Should have expected location", location, in.location()); + Assert.assertEquals("Should have expected length", dataSize, in.getLength()); + Assert.assertArrayEquals("Should have expected content", data, inFileContent(in, dataSize)); + } + + @Test + public void testDeleteFile() throws IOException { + String location = randomLocation(); + int dataSize = 1024 * 10; + byte[] data = randomData(dataSize); + OutputFile out = fileIO().newOutputFile(location); + try (OutputStream os = out.create(); InputStream is = new ByteArrayInputStream(data)) { + ByteStreams.copy(is, os); + } + + InputFile in = fileIO().newInputFile(location); + Assert.assertTrue("OSS file should exist", in.exists()); + fileIO().deleteFile(in); + Assert.assertFalse("OSS file should not exist", fileIO().newInputFile(location).exists()); + } + + @Test + public void testLoadFileIO() { + FileIO fileIO = CatalogUtil.loadFileIO(OSS_IMPL_CLASS, ImmutableMap.of(), conf); + Assert.assertTrue("Should be OSSFileIO", fileIO instanceof OSSFileIO); + + byte[] data = SerializationUtil.serializeToBytes(fileIO); + FileIO expectedFileIO = SerializationUtil.deserializeFromBytes(data); + Assert.assertTrue("The deserialized FileIO should be OSSFileIO", expectedFileIO instanceof OSSFileIO); + } + + @Test + public void serializeClient() throws URISyntaxException { + String endpoint = "iceberg-test-oss.aliyun.com"; + String accessKeyId = UUID.randomUUID().toString(); + String accessSecret = UUID.randomUUID().toString(); + SerializableSupplier pre = () -> new OSSClientBuilder().build(endpoint, accessKeyId, accessSecret); + + byte[] data = SerializationUtil.serializeToBytes(pre); + SerializableSupplier post = SerializationUtil.deserializeFromBytes(data); + + OSS client = post.get(); + Assert.assertTrue("Should be instance of oss client", client instanceof OSSClient); + + OSSClient oss = (OSSClient) client; + Assert.assertEquals("Should have expected endpoint", + new URI("http://" + endpoint), oss.getEndpoint()); + Assert.assertEquals("Should have expected access key", + accessKeyId, oss.getCredentialsProvider().getCredentials().getAccessKeyId()); + Assert.assertEquals("Should have expected secret key", + accessSecret, oss.getCredentialsProvider().getCredentials().getSecretAccessKey()); + } + + private String randomLocation() { + return location(String.format("%s.dat", UUID.randomUUID())); + } + + private byte[] randomData(int size) { + byte[] data = new byte[size]; + random.nextBytes(data); + return data; + } + + private long ossDataLength(OSSURI uri) { + return ossClient.getObject(uri.bucket(), uri.key()).getObjectMetadata().getContentLength(); + } + + private byte[] ossDataContent(OSSURI uri, int dataSize) throws IOException { + try (InputStream is = ossClient.getObject(uri.bucket(), uri.key()).getObjectContent()) { + byte[] actual = new byte[dataSize]; + ByteStreams.readFully(is, actual); + return actual; + } + } + + private byte[] inFileContent(InputFile in, int dataSize) throws IOException { + try (InputStream is = in.newStream()) { + byte[] actual = new byte[dataSize]; + ByteStreams.readFully(is, actual); + return actual; + } + } +} From a186f605bee872d5c5d0ba79762d81bbeb1576cd Mon Sep 17 00:00:00 2001 From: mikewu Date: Tue, 16 Nov 2021 14:46:55 +0800 Subject: [PATCH 2/8] rework --- .../aliyun/DefaultAliyunClientFactory.java | 4 +- .../apache/iceberg/aliyun/oss/OSSFileIO.java | 2 +- .../iceberg/aliyun/oss/AliyunOSSTestBase.java | 11 ---- .../iceberg/aliyun/oss/TestOSSFileIO.java | 54 ++++++++++++------- 4 files changed, 38 insertions(+), 33 deletions(-) diff --git a/aliyun/src/main/java/org/apache/iceberg/aliyun/DefaultAliyunClientFactory.java b/aliyun/src/main/java/org/apache/iceberg/aliyun/DefaultAliyunClientFactory.java index 2d862de10d7e..523a91860090 100644 --- a/aliyun/src/main/java/org/apache/iceberg/aliyun/DefaultAliyunClientFactory.java +++ b/aliyun/src/main/java/org/apache/iceberg/aliyun/DefaultAliyunClientFactory.java @@ -29,8 +29,8 @@ public class DefaultAliyunClientFactory implements AliyunClientFactory { @Override public OSS oss() { - Preconditions.checkNotNull(aliyunProperties, - "Cannot create aliyun oss client before initializing the AliyunClientFactory."); + Preconditions.checkNotNull( + aliyunProperties, "Cannot create aliyun oss client before initializing the AliyunClientFactory."); return new OSSClientBuilder().build( aliyunProperties.ossEndpoint(), aliyunProperties.accessKeyId(), aliyunProperties.accessKeySecret()); diff --git a/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSFileIO.java b/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSFileIO.java index 60fe8640b1e2..08a5a38c565c 100644 --- a/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSFileIO.java +++ b/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSFileIO.java @@ -30,7 +30,7 @@ import org.apache.iceberg.util.SerializableSupplier; /** - * FileIO implementation backend by OSS. + * FileIO implementation backed by OSS. *

* Locations used must follow the conventions for OSS URIs (e.g. oss://bucket/path...). * URIs with scheme https are also treated as oss file paths. diff --git a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/AliyunOSSTestBase.java b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/AliyunOSSTestBase.java index 284bddd740a6..24e89bf8c2ef 100644 --- a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/AliyunOSSTestBase.java +++ b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/AliyunOSSTestBase.java @@ -20,7 +20,6 @@ package org.apache.iceberg.aliyun.oss; import com.aliyun.oss.OSS; -import org.apache.iceberg.io.FileIO; import org.apache.iceberg.util.SerializableSupplier; import org.junit.After; import org.junit.Before; @@ -34,20 +33,14 @@ public abstract class AliyunOSSTestBase { private final String bucketName = OSS_TEST_RULE.testBucketName(); private final String keyPrefix = OSS_TEST_RULE.keyPrefix(); - private OSSFileIO fileIO; - @Before public void before() { - fileIO = new OSSFileIO(ossClient); - OSS_TEST_RULE.setUpBucket(bucketName); } @After public void after() { OSS_TEST_RULE.tearDownBucket(bucketName); - - fileIO.close(); } protected String location(String key) { @@ -57,8 +50,4 @@ protected String location(String key) { protected SerializableSupplier ossClient() { return ossClient; } - - protected FileIO fileIO() { - return fileIO; - } } diff --git a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/TestOSSFileIO.java b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/TestOSSFileIO.java index 278f67de55af..518d0b7710fc 100644 --- a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/TestOSSFileIO.java +++ b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/TestOSSFileIO.java @@ -40,15 +40,27 @@ import org.apache.iceberg.relocated.com.google.common.io.ByteStreams; import org.apache.iceberg.util.SerializableSupplier; import org.apache.iceberg.util.SerializationUtil; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; public class TestOSSFileIO extends AliyunOSSTestBase { private static final String OSS_IMPL_CLASS = OSSFileIO.class.getName(); - - private final OSS ossClient = ossClient().get(); - private final Random random = ThreadLocalRandom.current(); private final Configuration conf = new Configuration(); + private final Random random = ThreadLocalRandom.current(); + + private FileIO fileIO; + + @Before + public void beforeFile() { + fileIO = new OSSFileIO(ossClient()); + } + + @After + public void afterFile() { + fileIO.close(); + } @Test public void testOutputFile() throws IOException { @@ -57,14 +69,12 @@ public void testOutputFile() throws IOException { byte[] data = randomData(dataSize); OutputFile out = fileIO().newOutputFile(location); - try (OutputStream os = out.create(); InputStream is = new ByteArrayInputStream(data)) { - ByteStreams.copy(is, os); - } + writeOSSData(out, data); OSSURI uri = new OSSURI(location); - Assert.assertTrue("OSS file should exist", ossClient.doesObjectExist(uri.bucket(), uri.key())); + Assert.assertTrue("OSS file should exist", ossClient().get().doesObjectExist(uri.bucket(), uri.key())); Assert.assertEquals("Should have expected location", location, out.location()); - Assert.assertEquals("Should have expected length", ossDataLength(uri), dataSize); + Assert.assertEquals("Should have expected length", dataSize, ossDataLength(uri)); Assert.assertArrayEquals("Should have expected content", data, ossDataContent(uri, dataSize)); } @@ -77,9 +87,7 @@ public void testInputFile() throws IOException { int dataSize = 1024 * 10; byte[] data = randomData(dataSize); OutputFile out = fileIO().newOutputFile(location); - try (OutputStream os = out.createOrOverwrite(); InputStream is = new ByteArrayInputStream(data)) { - ByteStreams.copy(is, os); - } + writeOSSData(out, data); Assert.assertTrue("OSS file should exist", in.exists()); Assert.assertEquals("Should have expected location", location, in.location()); @@ -93,9 +101,7 @@ public void testDeleteFile() throws IOException { int dataSize = 1024 * 10; byte[] data = randomData(dataSize); OutputFile out = fileIO().newOutputFile(location); - try (OutputStream os = out.create(); InputStream is = new ByteArrayInputStream(data)) { - ByteStreams.copy(is, os); - } + writeOSSData(out, data); InputFile in = fileIO().newInputFile(location); Assert.assertTrue("OSS file should exist", in.exists()); @@ -105,10 +111,10 @@ public void testDeleteFile() throws IOException { @Test public void testLoadFileIO() { - FileIO fileIO = CatalogUtil.loadFileIO(OSS_IMPL_CLASS, ImmutableMap.of(), conf); - Assert.assertTrue("Should be OSSFileIO", fileIO instanceof OSSFileIO); + FileIO file = CatalogUtil.loadFileIO(OSS_IMPL_CLASS, ImmutableMap.of(), conf); + Assert.assertTrue("Should be OSSFileIO", file instanceof OSSFileIO); - byte[] data = SerializationUtil.serializeToBytes(fileIO); + byte[] data = SerializationUtil.serializeToBytes(file); FileIO expectedFileIO = SerializationUtil.deserializeFromBytes(data); Assert.assertTrue("The deserialized FileIO should be OSSFileIO", expectedFileIO instanceof OSSFileIO); } @@ -135,6 +141,10 @@ public void serializeClient() throws URISyntaxException { accessSecret, oss.getCredentialsProvider().getCredentials().getSecretAccessKey()); } + private FileIO fileIO() { + return fileIO; + } + private String randomLocation() { return location(String.format("%s.dat", UUID.randomUUID())); } @@ -146,17 +156,23 @@ private byte[] randomData(int size) { } private long ossDataLength(OSSURI uri) { - return ossClient.getObject(uri.bucket(), uri.key()).getObjectMetadata().getContentLength(); + return ossClient().get().getObject(uri.bucket(), uri.key()).getObjectMetadata().getContentLength(); } private byte[] ossDataContent(OSSURI uri, int dataSize) throws IOException { - try (InputStream is = ossClient.getObject(uri.bucket(), uri.key()).getObjectContent()) { + try (InputStream is = ossClient().get().getObject(uri.bucket(), uri.key()).getObjectContent()) { byte[] actual = new byte[dataSize]; ByteStreams.readFully(is, actual); return actual; } } + private void writeOSSData(OutputFile out, byte[] data) throws IOException { + try (OutputStream os = out.create(); InputStream is = new ByteArrayInputStream(data)) { + ByteStreams.copy(is, os); + } + } + private byte[] inFileContent(InputFile in, int dataSize) throws IOException { try (InputStream is = in.newStream()) { byte[] actual = new byte[dataSize]; From d0c64059edb5ecf7e82a3392fa893b8c8b5ea286 Mon Sep 17 00:00:00 2001 From: mikewu Date: Wed, 17 Nov 2021 14:19:13 +0800 Subject: [PATCH 3/8] rework --- .../iceberg/aliyun/AliyunClientFactory.java | 39 ++++++++++--------- .../iceberg/aliyun/AliyunProperties.java | 5 +++ .../iceberg/aliyun/oss/BaseOSSFile.java | 4 -- .../iceberg/aliyun/oss/OSSInputFile.java | 13 ++++++- 4 files changed, 36 insertions(+), 25 deletions(-) diff --git a/aliyun/src/main/java/org/apache/iceberg/aliyun/AliyunClientFactory.java b/aliyun/src/main/java/org/apache/iceberg/aliyun/AliyunClientFactory.java index a283d5cce3df..28d1742ef02a 100644 --- a/aliyun/src/main/java/org/apache/iceberg/aliyun/AliyunClientFactory.java +++ b/aliyun/src/main/java/org/apache/iceberg/aliyun/AliyunClientFactory.java @@ -25,27 +25,9 @@ import org.apache.iceberg.common.DynConstructors; public interface AliyunClientFactory extends Serializable { - /** - * Create an aliyun OSS client. - * - * @return oss client. - */ - OSS oss(); - - /** - * Initialize Aliyun client factory from catalog properties. - * - * @param properties catalog properties - */ - void initialize(Map properties); - - /** - * Returns an initialized {@link AliyunProperties} - */ - AliyunProperties aliyunProperties(); static AliyunClientFactory load(Map properties) { - String impl = properties.getOrDefault("client.factory", DefaultAliyunClientFactory.class.getName()); + String impl = properties.getOrDefault(AliyunProperties.CLIENT_FACTORY, DefaultAliyunClientFactory.class.getName()); return load(impl, properties); } @@ -76,4 +58,23 @@ static AliyunClientFactory load(String impl, Map properties) { factory.initialize(properties); return factory; } + + /** + * Create an aliyun OSS client. + * + * @return oss client. + */ + OSS oss(); + + /** + * Initialize Aliyun client factory from catalog properties. + * + * @param properties catalog properties + */ + void initialize(Map properties); + + /** + * Returns an initialized {@link AliyunProperties} + */ + AliyunProperties aliyunProperties(); } diff --git a/aliyun/src/main/java/org/apache/iceberg/aliyun/AliyunProperties.java b/aliyun/src/main/java/org/apache/iceberg/aliyun/AliyunProperties.java index 40ddc22b8c87..ad6b1a91949d 100644 --- a/aliyun/src/main/java/org/apache/iceberg/aliyun/AliyunProperties.java +++ b/aliyun/src/main/java/org/apache/iceberg/aliyun/AliyunProperties.java @@ -52,6 +52,11 @@ public class AliyunProperties implements Serializable { */ public static final String ACCESS_KEY_SECRET = "access.key.secret"; + /** + * Aliyun client factory for oss service, defaults to {@link DefaultAliyunClientFactory} implementation + */ + public static final String CLIENT_FACTORY = "client.factory"; + /** * Location to put staging files for uploading to OSS, defaults to the directory value of java.io.tmpdir. */ diff --git a/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/BaseOSSFile.java b/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/BaseOSSFile.java index 165f72e6663c..a58727b0359f 100644 --- a/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/BaseOSSFile.java +++ b/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/BaseOSSFile.java @@ -32,10 +32,6 @@ abstract class BaseOSSFile { private AliyunProperties aliyunProperties; private SimplifiedObjectMeta metadata; - BaseOSSFile(OSS client, OSSURI uri) { - this(client, uri, new AliyunProperties()); - } - BaseOSSFile(OSS client, OSSURI uri, AliyunProperties aliyunProperties) { this.client = client; this.uri = uri; diff --git a/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSInputFile.java b/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSInputFile.java index 4137744fbd94..3e3f958765e3 100644 --- a/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSInputFile.java +++ b/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSInputFile.java @@ -20,6 +20,7 @@ package org.apache.iceberg.aliyun.oss; import com.aliyun.oss.OSS; +import org.apache.iceberg.aliyun.AliyunProperties; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.SeekableInputStream; @@ -29,11 +30,19 @@ public class OSSInputFile extends BaseOSSFile implements InputFile { private Long length = null; OSSInputFile(OSS client, OSSURI uri) { - super(client, uri); + this(client, uri, new AliyunProperties()); } OSSInputFile(OSS client, OSSURI uri, long length) { - super(client, uri); + this(client, uri, new AliyunProperties(), length); + } + + OSSInputFile(OSS client, OSSURI uri, AliyunProperties aliyunProperties) { + super(client, uri, aliyunProperties); + } + + OSSInputFile(OSS client, OSSURI uri, AliyunProperties aliyunProperties, long length) { + super(client, uri, aliyunProperties); ValidationException.check(length >= 0, "Invalid file length: %s", length); this.length = length; } From 7b4ed4d88c11b3b1f69d9d5f547c9e86b7a589a1 Mon Sep 17 00:00:00 2001 From: mikewu Date: Wed, 17 Nov 2021 20:44:35 +0800 Subject: [PATCH 4/8] rework --- .../src/main/java/org/apache/iceberg/aliyun/oss/OSSFileIO.java | 2 +- .../main/java/org/apache/iceberg/aliyun/oss/OSSOutputFile.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSFileIO.java b/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSFileIO.java index 08a5a38c565c..c0d9268a9dc4 100644 --- a/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSFileIO.java +++ b/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSFileIO.java @@ -65,7 +65,7 @@ public OSSFileIO(SerializableSupplier oss) { @Override public InputFile newInputFile(String path) { - return new OSSInputFile(client(), new OSSURI(path)); + return new OSSInputFile(client(), new OSSURI(path), aliyunProperties); } @Override diff --git a/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSOutputFile.java b/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSOutputFile.java index d5acde003390..2dc225cd76fa 100644 --- a/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSOutputFile.java +++ b/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSOutputFile.java @@ -52,6 +52,6 @@ public PositionOutputStream createOrOverwrite() { @Override public InputFile toInputFile() { - return new OSSInputFile(client(), uri()); + return new OSSInputFile(client(), uri(), aliyunProperties()); } } From e922abb7bfc51fc7c02c6b6ac9df651d362717a9 Mon Sep 17 00:00:00 2001 From: mikewu Date: Thu, 18 Nov 2021 19:26:51 +0800 Subject: [PATCH 5/8] rework --- .../apache/iceberg/aliyun/AliyunClientFactory.java | 2 +- .../org/apache/iceberg/aliyun/AliyunProperties.java | 6 ++++-- .../iceberg/aliyun/DefaultAliyunClientFactory.java | 2 +- .../org/apache/iceberg/aliyun/oss/OSSFileIO.java | 2 +- .../org/apache/iceberg/aliyun/oss/OSSInputFile.java | 8 -------- .../apache/iceberg/aliyun/oss/TestOSSInputFile.java | 13 ++++++++----- 6 files changed, 15 insertions(+), 18 deletions(-) diff --git a/aliyun/src/main/java/org/apache/iceberg/aliyun/AliyunClientFactory.java b/aliyun/src/main/java/org/apache/iceberg/aliyun/AliyunClientFactory.java index 28d1742ef02a..4dd89df6bd03 100644 --- a/aliyun/src/main/java/org/apache/iceberg/aliyun/AliyunClientFactory.java +++ b/aliyun/src/main/java/org/apache/iceberg/aliyun/AliyunClientFactory.java @@ -64,7 +64,7 @@ static AliyunClientFactory load(String impl, Map properties) { * * @return oss client. */ - OSS oss(); + OSS newClient(); /** * Initialize Aliyun client factory from catalog properties. diff --git a/aliyun/src/main/java/org/apache/iceberg/aliyun/AliyunProperties.java b/aliyun/src/main/java/org/apache/iceberg/aliyun/AliyunProperties.java index ad6b1a91949d..c4e4b73abe9a 100644 --- a/aliyun/src/main/java/org/apache/iceberg/aliyun/AliyunProperties.java +++ b/aliyun/src/main/java/org/apache/iceberg/aliyun/AliyunProperties.java @@ -53,9 +53,11 @@ public class AliyunProperties implements Serializable { public static final String ACCESS_KEY_SECRET = "access.key.secret"; /** - * Aliyun client factory for oss service, defaults to {@link DefaultAliyunClientFactory} implementation + * The implementation class of {@link AliyunClientFactory} to customize Aliyun client configurations. + * If set, all Aliyun clients will be initialized by the specified factory. + * If not set, {@link DefaultAliyunClientFactory} is used as default factory. */ - public static final String CLIENT_FACTORY = "client.factory"; + public static final String CLIENT_FACTORY = "client.factory-impl"; /** * Location to put staging files for uploading to OSS, defaults to the directory value of java.io.tmpdir. diff --git a/aliyun/src/main/java/org/apache/iceberg/aliyun/DefaultAliyunClientFactory.java b/aliyun/src/main/java/org/apache/iceberg/aliyun/DefaultAliyunClientFactory.java index 523a91860090..dc696ec56c60 100644 --- a/aliyun/src/main/java/org/apache/iceberg/aliyun/DefaultAliyunClientFactory.java +++ b/aliyun/src/main/java/org/apache/iceberg/aliyun/DefaultAliyunClientFactory.java @@ -28,7 +28,7 @@ public class DefaultAliyunClientFactory implements AliyunClientFactory { private AliyunProperties aliyunProperties; @Override - public OSS oss() { + public OSS newClient() { Preconditions.checkNotNull( aliyunProperties, "Cannot create aliyun oss client before initializing the AliyunClientFactory."); diff --git a/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSFileIO.java b/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSFileIO.java index c0d9268a9dc4..400c8e3b3eed 100644 --- a/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSFileIO.java +++ b/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSFileIO.java @@ -90,7 +90,7 @@ private OSS client() { public void initialize(Map properties) { AliyunClientFactory factory = AliyunClientFactory.load(properties); this.aliyunProperties = factory.aliyunProperties(); - this.oss = factory::oss; + this.oss = factory::newClient; } @Override diff --git a/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSInputFile.java b/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSInputFile.java index 3e3f958765e3..b1ea6b04440b 100644 --- a/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSInputFile.java +++ b/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSInputFile.java @@ -29,14 +29,6 @@ public class OSSInputFile extends BaseOSSFile implements InputFile { private Long length = null; - OSSInputFile(OSS client, OSSURI uri) { - this(client, uri, new AliyunProperties()); - } - - OSSInputFile(OSS client, OSSURI uri, long length) { - this(client, uri, new AliyunProperties(), length); - } - OSSInputFile(OSS client, OSSURI uri, AliyunProperties aliyunProperties) { super(client, uri, aliyunProperties); } diff --git a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/TestOSSInputFile.java b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/TestOSSInputFile.java index 304d07404fd5..c939f5deb68d 100644 --- a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/TestOSSInputFile.java +++ b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/TestOSSInputFile.java @@ -26,6 +26,7 @@ import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.aliyun.AliyunProperties; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.SeekableInputStream; @@ -43,6 +44,7 @@ public class TestOSSInputFile extends AliyunOSSTestBase { private final OSS ossClient = ossClient().get(); private final OSS ossMock = mock(OSS.class, delegatesTo(ossClient)); + private final AliyunProperties aliyunProperties = new AliyunProperties(); private final Random random = ThreadLocalRandom.current(); @Test @@ -60,14 +62,15 @@ public void testReadFile() throws Exception { public void testOSSInputFile() { OSSURI uri = randomURI(); AssertHelpers.assertThrows("File length should not be negative", ValidationException.class, - "Invalid file length", () -> new OSSInputFile(ossClient().get(), uri, -1)); + "Invalid file length", + () -> new OSSInputFile(ossClient().get(), uri, aliyunProperties, -1)); } @Test public void testExists() { OSSURI uri = randomURI(); - InputFile inputFile = new OSSInputFile(ossMock, uri); + InputFile inputFile = new OSSInputFile(ossMock, uri, aliyunProperties); Assert.assertFalse("OSS file should not exist", inputFile.exists()); verify(ossMock, times(1)).getSimplifiedObjectMeta(uri.bucket(), uri.key()); reset(ossMock); @@ -100,7 +103,7 @@ public void testGetLength() { } private void readAndVerify(OSSURI uri, byte[] data) throws IOException { - InputFile inputFile = new OSSInputFile(ossClient().get(), uri); + InputFile inputFile = new OSSInputFile(ossClient().get(), uri, aliyunProperties); Assert.assertTrue("OSS file should exist", inputFile.exists()); Assert.assertEquals("Should have expected file length", data.length, inputFile.getLength()); @@ -114,9 +117,9 @@ private void readAndVerify(OSSURI uri, byte[] data) throws IOException { private void verifyLength(OSS ossClientMock, OSSURI uri, byte[] data, boolean isCache) { InputFile inputFile; if (isCache) { - inputFile = new OSSInputFile(ossClientMock, uri, data.length); + inputFile = new OSSInputFile(ossClientMock, uri, aliyunProperties, data.length); } else { - inputFile = new OSSInputFile(ossClientMock, uri); + inputFile = new OSSInputFile(ossClientMock, uri, aliyunProperties); } inputFile.getLength(); Assert.assertEquals("Should have expected file length", data.length, inputFile.getLength()); From 694e1833bc4407adc7af0e1b48c294d11894b465 Mon Sep 17 00:00:00 2001 From: mikewu Date: Fri, 19 Nov 2021 16:50:24 +0800 Subject: [PATCH 6/8] rework --- .../iceberg/aliyun/AliyunClientFactories.java | 100 ++++++++++++++++++ .../iceberg/aliyun/AliyunClientFactory.java | 36 +------ .../iceberg/aliyun/AliyunProperties.java | 2 +- .../aliyun/DefaultAliyunClientFactory.java | 48 --------- .../apache/iceberg/aliyun/oss/OSSFileIO.java | 5 +- .../aliyun/TestAliyunClientFactories.java | 70 ++++++++++++ 6 files changed, 175 insertions(+), 86 deletions(-) create mode 100644 aliyun/src/main/java/org/apache/iceberg/aliyun/AliyunClientFactories.java delete mode 100644 aliyun/src/main/java/org/apache/iceberg/aliyun/DefaultAliyunClientFactory.java create mode 100644 aliyun/src/test/java/org/apache/iceberg/aliyun/TestAliyunClientFactories.java diff --git a/aliyun/src/main/java/org/apache/iceberg/aliyun/AliyunClientFactories.java b/aliyun/src/main/java/org/apache/iceberg/aliyun/AliyunClientFactories.java new file mode 100644 index 000000000000..612afe2b7029 --- /dev/null +++ b/aliyun/src/main/java/org/apache/iceberg/aliyun/AliyunClientFactories.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aliyun; + +import com.aliyun.oss.OSS; +import com.aliyun.oss.OSSClientBuilder; +import java.util.Map; +import org.apache.iceberg.common.DynConstructors; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +public class AliyunClientFactories { + + private static final AliyunClientFactory ALIYUN_CLIENT_FACTORY_DEFAULT = new DefaultAliyunClientFactory(); + + private AliyunClientFactories() { + } + + public static AliyunClientFactory defaultFactory() { + return ALIYUN_CLIENT_FACTORY_DEFAULT; + } + + public static AliyunClientFactory load(Map properties) { + if (properties.containsKey(AliyunProperties.CLIENT_FACTORY)) { + return load(properties.get(AliyunProperties.CLIENT_FACTORY), properties); + } else { + return defaultFactory(); + } + } + + /** + * Load an implemented {@link AliyunClientFactory} based on the class name, and initialize it. + * + * @param impl the class name. + * @param properties to initialize the factory. + * @return an initialized {@link AliyunClientFactory}. + */ + private static AliyunClientFactory load(String impl, Map properties) { + DynConstructors.Ctor ctor; + try { + ctor = DynConstructors.builder(AliyunClientFactory.class).impl(impl).buildChecked(); + } catch (NoSuchMethodException e) { + throw new IllegalArgumentException(String.format( + "Cannot initialize AliyunClientFactory, missing no-arg constructor: %s", impl), e); + } + + AliyunClientFactory factory; + try { + factory = ctor.newInstance(); + } catch (ClassCastException e) { + throw new IllegalArgumentException( + String.format("Cannot initialize AliyunClientFactory, %s does not implement AliyunClientFactory.", impl), e); + } + + factory.initialize(properties); + return factory; + } + + static class DefaultAliyunClientFactory implements AliyunClientFactory { + private AliyunProperties aliyunProperties; + + DefaultAliyunClientFactory() { + } + + @Override + public OSS ossClient() { + Preconditions.checkNotNull( + aliyunProperties, "Cannot create aliyun oss client before initializing the AliyunClientFactory."); + + return new OSSClientBuilder().build( + aliyunProperties.ossEndpoint(), aliyunProperties.accessKeyId(), aliyunProperties.accessKeySecret()); + } + + @Override + public void initialize(Map properties) { + this.aliyunProperties = new AliyunProperties(properties); + } + + @Override + public AliyunProperties aliyunProperties() { + return aliyunProperties; + } + } +} diff --git a/aliyun/src/main/java/org/apache/iceberg/aliyun/AliyunClientFactory.java b/aliyun/src/main/java/org/apache/iceberg/aliyun/AliyunClientFactory.java index 4dd89df6bd03..77e174a940fa 100644 --- a/aliyun/src/main/java/org/apache/iceberg/aliyun/AliyunClientFactory.java +++ b/aliyun/src/main/java/org/apache/iceberg/aliyun/AliyunClientFactory.java @@ -22,49 +22,15 @@ import com.aliyun.oss.OSS; import java.io.Serializable; import java.util.Map; -import org.apache.iceberg.common.DynConstructors; public interface AliyunClientFactory extends Serializable { - static AliyunClientFactory load(Map properties) { - String impl = properties.getOrDefault(AliyunProperties.CLIENT_FACTORY, DefaultAliyunClientFactory.class.getName()); - return load(impl, properties); - } - - /** - * Load an implemented {@link AliyunClientFactory} based on the class name, and initialize it. - * - * @param impl the class name. - * @param properties to initialize the factory. - * @return an initialized {@link AliyunClientFactory}. - */ - static AliyunClientFactory load(String impl, Map properties) { - DynConstructors.Ctor ctor; - try { - ctor = DynConstructors.builder(AliyunClientFactory.class).impl(impl).buildChecked(); - } catch (NoSuchMethodException e) { - throw new IllegalArgumentException(String.format( - "Cannot initialize AliyunClientFactory, missing no-arg constructor: %s", impl), e); - } - - AliyunClientFactory factory; - try { - factory = ctor.newInstance(); - } catch (ClassCastException e) { - throw new IllegalArgumentException( - String.format("Cannot initialize AliyunClientFactory, %s does not implement AliyunClientFactory.", impl), e); - } - - factory.initialize(properties); - return factory; - } - /** * Create an aliyun OSS client. * * @return oss client. */ - OSS newClient(); + OSS ossClient(); /** * Initialize Aliyun client factory from catalog properties. diff --git a/aliyun/src/main/java/org/apache/iceberg/aliyun/AliyunProperties.java b/aliyun/src/main/java/org/apache/iceberg/aliyun/AliyunProperties.java index c4e4b73abe9a..5a04f2a47ce3 100644 --- a/aliyun/src/main/java/org/apache/iceberg/aliyun/AliyunProperties.java +++ b/aliyun/src/main/java/org/apache/iceberg/aliyun/AliyunProperties.java @@ -55,7 +55,7 @@ public class AliyunProperties implements Serializable { /** * The implementation class of {@link AliyunClientFactory} to customize Aliyun client configurations. * If set, all Aliyun clients will be initialized by the specified factory. - * If not set, {@link DefaultAliyunClientFactory} is used as default factory. + * If not set, {@link AliyunClientFactories#defaultFactory()} is used as default factory. */ public static final String CLIENT_FACTORY = "client.factory-impl"; diff --git a/aliyun/src/main/java/org/apache/iceberg/aliyun/DefaultAliyunClientFactory.java b/aliyun/src/main/java/org/apache/iceberg/aliyun/DefaultAliyunClientFactory.java deleted file mode 100644 index dc696ec56c60..000000000000 --- a/aliyun/src/main/java/org/apache/iceberg/aliyun/DefaultAliyunClientFactory.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iceberg.aliyun; - -import com.aliyun.oss.OSS; -import com.aliyun.oss.OSSClientBuilder; -import java.util.Map; -import org.apache.iceberg.relocated.com.google.common.base.Preconditions; - -public class DefaultAliyunClientFactory implements AliyunClientFactory { - private AliyunProperties aliyunProperties; - - @Override - public OSS newClient() { - Preconditions.checkNotNull( - aliyunProperties, "Cannot create aliyun oss client before initializing the AliyunClientFactory."); - - return new OSSClientBuilder().build( - aliyunProperties.ossEndpoint(), aliyunProperties.accessKeyId(), aliyunProperties.accessKeySecret()); - } - - @Override - public void initialize(Map properties) { - this.aliyunProperties = new AliyunProperties(properties); - } - - @Override - public AliyunProperties aliyunProperties() { - return aliyunProperties; - } -} diff --git a/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSFileIO.java b/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSFileIO.java index 400c8e3b3eed..5c07b0652193 100644 --- a/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSFileIO.java +++ b/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSFileIO.java @@ -22,6 +22,7 @@ import com.aliyun.oss.OSS; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.iceberg.aliyun.AliyunClientFactories; import org.apache.iceberg.aliyun.AliyunClientFactory; import org.apache.iceberg.aliyun.AliyunProperties; import org.apache.iceberg.io.FileIO; @@ -88,9 +89,9 @@ private OSS client() { @Override public void initialize(Map properties) { - AliyunClientFactory factory = AliyunClientFactory.load(properties); + AliyunClientFactory factory = AliyunClientFactories.load(properties); this.aliyunProperties = factory.aliyunProperties(); - this.oss = factory::newClient; + this.oss = factory::ossClient; } @Override diff --git a/aliyun/src/test/java/org/apache/iceberg/aliyun/TestAliyunClientFactories.java b/aliyun/src/test/java/org/apache/iceberg/aliyun/TestAliyunClientFactories.java new file mode 100644 index 000000000000..d20800dd79ae --- /dev/null +++ b/aliyun/src/test/java/org/apache/iceberg/aliyun/TestAliyunClientFactories.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aliyun; + +import com.aliyun.oss.OSS; + +import java.util.HashMap; +import java.util.Map; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.junit.Assert; +import org.junit.Test; + +public class TestAliyunClientFactories { + + @Test + public void testLoadDefault() { + Assert.assertEquals("Default client should be singleton", + AliyunClientFactories.defaultFactory(), AliyunClientFactories.defaultFactory()); + + Assert.assertTrue("Should load default when not configured", + AliyunClientFactories.load(Maps.newHashMap()) instanceof AliyunClientFactories.DefaultAliyunClientFactory); + } + + @Test + public void testLoadCustom() { + Map properties = Maps.newHashMap(); + properties.put(AliyunProperties.CLIENT_FACTORY, CustomFactory.class.getName()); + Assert.assertTrue("Should load custom class", + AliyunClientFactories.load(properties) instanceof CustomFactory); + } + + public static class CustomFactory implements AliyunClientFactory { + + AliyunProperties aliyunProperties; + public CustomFactory() { + } + + @Override + public OSS ossClient() { + return null; + } + + @Override + public void initialize(Map properties) { + this.aliyunProperties = new AliyunProperties(properties); + } + + @Override + public AliyunProperties aliyunProperties() { + return aliyunProperties; + } + } +} From effd07ab61604da240c2d690820843687b9ed30e Mon Sep 17 00:00:00 2001 From: mikewu Date: Fri, 19 Nov 2021 18:27:51 +0800 Subject: [PATCH 7/8] fix checkstyle --- .../apache/iceberg/aliyun/TestAliyunClientFactories.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/aliyun/src/test/java/org/apache/iceberg/aliyun/TestAliyunClientFactories.java b/aliyun/src/test/java/org/apache/iceberg/aliyun/TestAliyunClientFactories.java index d20800dd79ae..6bf21c21fee8 100644 --- a/aliyun/src/test/java/org/apache/iceberg/aliyun/TestAliyunClientFactories.java +++ b/aliyun/src/test/java/org/apache/iceberg/aliyun/TestAliyunClientFactories.java @@ -20,8 +20,6 @@ package org.apache.iceberg.aliyun; import com.aliyun.oss.OSS; - -import java.util.HashMap; import java.util.Map; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.junit.Assert; @@ -34,7 +32,8 @@ public void testLoadDefault() { Assert.assertEquals("Default client should be singleton", AliyunClientFactories.defaultFactory(), AliyunClientFactories.defaultFactory()); - Assert.assertTrue("Should load default when not configured", + Assert.assertTrue( + "Should load default when not configured", AliyunClientFactories.load(Maps.newHashMap()) instanceof AliyunClientFactories.DefaultAliyunClientFactory); } @@ -42,13 +41,15 @@ public void testLoadDefault() { public void testLoadCustom() { Map properties = Maps.newHashMap(); properties.put(AliyunProperties.CLIENT_FACTORY, CustomFactory.class.getName()); - Assert.assertTrue("Should load custom class", + Assert.assertTrue( + "Should load custom class", AliyunClientFactories.load(properties) instanceof CustomFactory); } public static class CustomFactory implements AliyunClientFactory { AliyunProperties aliyunProperties; + public CustomFactory() { } From bd2571cdfd45dad9f88d30c242ef675b9eeb18d1 Mon Sep 17 00:00:00 2001 From: mikewu Date: Mon, 22 Nov 2021 14:46:59 +0800 Subject: [PATCH 8/8] rename ossClient as newOSSClient --- .../java/org/apache/iceberg/aliyun/AliyunClientFactories.java | 2 +- .../java/org/apache/iceberg/aliyun/AliyunClientFactory.java | 2 +- .../src/main/java/org/apache/iceberg/aliyun/oss/OSSFileIO.java | 2 +- .../org/apache/iceberg/aliyun/TestAliyunClientFactories.java | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/aliyun/src/main/java/org/apache/iceberg/aliyun/AliyunClientFactories.java b/aliyun/src/main/java/org/apache/iceberg/aliyun/AliyunClientFactories.java index 612afe2b7029..d66a6783e2a2 100644 --- a/aliyun/src/main/java/org/apache/iceberg/aliyun/AliyunClientFactories.java +++ b/aliyun/src/main/java/org/apache/iceberg/aliyun/AliyunClientFactories.java @@ -79,7 +79,7 @@ static class DefaultAliyunClientFactory implements AliyunClientFactory { } @Override - public OSS ossClient() { + public OSS newOSSClient() { Preconditions.checkNotNull( aliyunProperties, "Cannot create aliyun oss client before initializing the AliyunClientFactory."); diff --git a/aliyun/src/main/java/org/apache/iceberg/aliyun/AliyunClientFactory.java b/aliyun/src/main/java/org/apache/iceberg/aliyun/AliyunClientFactory.java index 77e174a940fa..6657a2234f85 100644 --- a/aliyun/src/main/java/org/apache/iceberg/aliyun/AliyunClientFactory.java +++ b/aliyun/src/main/java/org/apache/iceberg/aliyun/AliyunClientFactory.java @@ -30,7 +30,7 @@ public interface AliyunClientFactory extends Serializable { * * @return oss client. */ - OSS ossClient(); + OSS newOSSClient(); /** * Initialize Aliyun client factory from catalog properties. diff --git a/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSFileIO.java b/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSFileIO.java index 5c07b0652193..e45944c6e95c 100644 --- a/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSFileIO.java +++ b/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSFileIO.java @@ -91,7 +91,7 @@ private OSS client() { public void initialize(Map properties) { AliyunClientFactory factory = AliyunClientFactories.load(properties); this.aliyunProperties = factory.aliyunProperties(); - this.oss = factory::ossClient; + this.oss = factory::newOSSClient; } @Override diff --git a/aliyun/src/test/java/org/apache/iceberg/aliyun/TestAliyunClientFactories.java b/aliyun/src/test/java/org/apache/iceberg/aliyun/TestAliyunClientFactories.java index 6bf21c21fee8..675ccb6090df 100644 --- a/aliyun/src/test/java/org/apache/iceberg/aliyun/TestAliyunClientFactories.java +++ b/aliyun/src/test/java/org/apache/iceberg/aliyun/TestAliyunClientFactories.java @@ -54,7 +54,7 @@ public CustomFactory() { } @Override - public OSS ossClient() { + public OSS newOSSClient() { return null; }