diff --git a/aliyun/src/main/java/org/apache/iceberg/aliyun/AliyunClientFactories.java b/aliyun/src/main/java/org/apache/iceberg/aliyun/AliyunClientFactories.java new file mode 100644 index 000000000000..d66a6783e2a2 --- /dev/null +++ b/aliyun/src/main/java/org/apache/iceberg/aliyun/AliyunClientFactories.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aliyun; + +import com.aliyun.oss.OSS; +import com.aliyun.oss.OSSClientBuilder; +import java.util.Map; +import org.apache.iceberg.common.DynConstructors; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +public class AliyunClientFactories { + + private static final AliyunClientFactory ALIYUN_CLIENT_FACTORY_DEFAULT = new DefaultAliyunClientFactory(); + + private AliyunClientFactories() { + } + + public static AliyunClientFactory defaultFactory() { + return ALIYUN_CLIENT_FACTORY_DEFAULT; + } + + public static AliyunClientFactory load(Map properties) { + if (properties.containsKey(AliyunProperties.CLIENT_FACTORY)) { + return load(properties.get(AliyunProperties.CLIENT_FACTORY), properties); + } else { + return defaultFactory(); + } + } + + /** + * Load an implemented {@link AliyunClientFactory} based on the class name, and initialize it. + * + * @param impl the class name. + * @param properties to initialize the factory. + * @return an initialized {@link AliyunClientFactory}. + */ + private static AliyunClientFactory load(String impl, Map properties) { + DynConstructors.Ctor ctor; + try { + ctor = DynConstructors.builder(AliyunClientFactory.class).impl(impl).buildChecked(); + } catch (NoSuchMethodException e) { + throw new IllegalArgumentException(String.format( + "Cannot initialize AliyunClientFactory, missing no-arg constructor: %s", impl), e); + } + + AliyunClientFactory factory; + try { + factory = ctor.newInstance(); + } catch (ClassCastException e) { + throw new IllegalArgumentException( + String.format("Cannot initialize AliyunClientFactory, %s does not implement AliyunClientFactory.", impl), e); + } + + factory.initialize(properties); + return factory; + } + + static class DefaultAliyunClientFactory implements AliyunClientFactory { + private AliyunProperties aliyunProperties; + + DefaultAliyunClientFactory() { + } + + @Override + public OSS newOSSClient() { + Preconditions.checkNotNull( + aliyunProperties, "Cannot create aliyun oss client before initializing the AliyunClientFactory."); + + return new OSSClientBuilder().build( + aliyunProperties.ossEndpoint(), aliyunProperties.accessKeyId(), aliyunProperties.accessKeySecret()); + } + + @Override + public void initialize(Map properties) { + this.aliyunProperties = new AliyunProperties(properties); + } + + @Override + public AliyunProperties aliyunProperties() { + return aliyunProperties; + } + } +} diff --git a/aliyun/src/main/java/org/apache/iceberg/aliyun/AliyunClientFactory.java b/aliyun/src/main/java/org/apache/iceberg/aliyun/AliyunClientFactory.java new file mode 100644 index 000000000000..6657a2234f85 --- /dev/null +++ b/aliyun/src/main/java/org/apache/iceberg/aliyun/AliyunClientFactory.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aliyun; + +import com.aliyun.oss.OSS; +import java.io.Serializable; +import java.util.Map; + +public interface AliyunClientFactory extends Serializable { + + /** + * Create an aliyun OSS client. + * + * @return oss client. + */ + OSS newOSSClient(); + + /** + * Initialize Aliyun client factory from catalog properties. + * + * @param properties catalog properties + */ + void initialize(Map properties); + + /** + * Returns an initialized {@link AliyunProperties} + */ + AliyunProperties aliyunProperties(); +} diff --git a/aliyun/src/main/java/org/apache/iceberg/aliyun/AliyunProperties.java b/aliyun/src/main/java/org/apache/iceberg/aliyun/AliyunProperties.java index e63940281d01..5a04f2a47ce3 100644 --- a/aliyun/src/main/java/org/apache/iceberg/aliyun/AliyunProperties.java +++ b/aliyun/src/main/java/org/apache/iceberg/aliyun/AliyunProperties.java @@ -25,10 +25,48 @@ import org.apache.iceberg.util.PropertyUtil; public class AliyunProperties implements Serializable { + /** + * The domain name used to access OSS. OSS uses HTTP Restful APIs to provide services. Different regions are accessed + * by using different endpoints. For the same region, access over the internal network or over the Internet also uses + * different endpoints. For more information, see: + * https://www.alibabacloud.com/help/doc-detail/31837.htm + */ + public static final String OSS_ENDPOINT = "oss.endpoint"; + + /** + * Aliyun uses an AccessKey pair, which includes an AccessKey ID and an AccessKey secret to implement symmetric + * encryption and verify the identity of a requester. The AccessKey ID is used to identify a user. + *

+ * For more information about how to obtain an AccessKey pair, see: + * https://www.alibabacloud.com/help/doc-detail/53045.htm + */ + public static final String ACCESS_KEY_ID = "access.key.id"; + + /** + * Aliyun uses an AccessKey pair, which includes an AccessKey ID and an AccessKey secret to implement symmetric + * encryption and verify the identity of a requester. The AccessKey secret is used to encrypt and verify the + * signature string. + *

+ * For more information about how to obtain an AccessKey pair, see: + * https://www.alibabacloud.com/help/doc-detail/53045.htm + */ + public static final String ACCESS_KEY_SECRET = "access.key.secret"; + + /** + * The implementation class of {@link AliyunClientFactory} to customize Aliyun client configurations. + * If set, all Aliyun clients will be initialized by the specified factory. + * If not set, {@link AliyunClientFactories#defaultFactory()} is used as default factory. + */ + public static final String CLIENT_FACTORY = "client.factory-impl"; + /** * Location to put staging files for uploading to OSS, defaults to the directory value of java.io.tmpdir. */ public static final String OSS_STAGING_DIRECTORY = "oss.staging-dir"; + + private final String ossEndpoint; + private final String accessKeyId; + private final String accessKeySecret; private final String ossStagingDirectory; public AliyunProperties() { @@ -36,10 +74,27 @@ public AliyunProperties() { } public AliyunProperties(Map properties) { + // OSS endpoint, accessKeyId, accessKeySecret. + this.ossEndpoint = properties.get(OSS_ENDPOINT); + this.accessKeyId = properties.get(ACCESS_KEY_ID); + this.accessKeySecret = properties.get(ACCESS_KEY_SECRET); + this.ossStagingDirectory = PropertyUtil.propertyAsString(properties, OSS_STAGING_DIRECTORY, System.getProperty("java.io.tmpdir")); } + public String ossEndpoint() { + return ossEndpoint; + } + + public String accessKeyId() { + return accessKeyId; + } + + public String accessKeySecret() { + return accessKeySecret; + } + public String ossStagingDirectory() { return ossStagingDirectory; } diff --git a/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/BaseOSSFile.java b/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/BaseOSSFile.java index 165f72e6663c..a58727b0359f 100644 --- a/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/BaseOSSFile.java +++ b/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/BaseOSSFile.java @@ -32,10 +32,6 @@ abstract class BaseOSSFile { private AliyunProperties aliyunProperties; private SimplifiedObjectMeta metadata; - BaseOSSFile(OSS client, OSSURI uri) { - this(client, uri, new AliyunProperties()); - } - BaseOSSFile(OSS client, OSSURI uri, AliyunProperties aliyunProperties) { this.client = client; this.uri = uri; diff --git a/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSFileIO.java b/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSFileIO.java new file mode 100644 index 000000000000..e45944c6e95c --- /dev/null +++ b/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSFileIO.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aliyun.oss; + +import com.aliyun.oss.OSS; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.iceberg.aliyun.AliyunClientFactories; +import org.apache.iceberg.aliyun.AliyunClientFactory; +import org.apache.iceberg.aliyun.AliyunProperties; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.util.SerializableSupplier; + +/** + * FileIO implementation backed by OSS. + *

+ * Locations used must follow the conventions for OSS URIs (e.g. oss://bucket/path...). + * URIs with scheme https are also treated as oss file paths. + * Using this FileIO with other schemes with result in {@link org.apache.iceberg.exceptions.ValidationException} + */ +public class OSSFileIO implements FileIO { + + private SerializableSupplier oss; + private AliyunProperties aliyunProperties; + private transient OSS client; + private final AtomicBoolean isResourceClosed = new AtomicBoolean(false); + + /** + * No-arg constructor to load the FileIO dynamically. + *

+ * All fields are initialized by calling {@link OSSFileIO#initialize(Map)} later. + */ + public OSSFileIO() { + } + + /** + * Constructor with custom oss supplier and default aliyun properties. + *

+ * Calling {@link OSSFileIO#initialize(Map)} will overwrite information set in this constructor. + * + * @param oss oss supplier + */ + public OSSFileIO(SerializableSupplier oss) { + this.oss = oss; + this.aliyunProperties = new AliyunProperties(); + } + + @Override + public InputFile newInputFile(String path) { + return new OSSInputFile(client(), new OSSURI(path), aliyunProperties); + } + + @Override + public OutputFile newOutputFile(String path) { + return new OSSOutputFile(client(), new OSSURI(path), aliyunProperties); + } + + @Override + public void deleteFile(String path) { + OSSURI location = new OSSURI(path); + client().deleteObject(location.bucket(), location.key()); + } + + private OSS client() { + if (client == null) { + client = oss.get(); + } + return client; + } + + @Override + public void initialize(Map properties) { + AliyunClientFactory factory = AliyunClientFactories.load(properties); + this.aliyunProperties = factory.aliyunProperties(); + this.oss = factory::newOSSClient; + } + + @Override + public void close() { + // handles concurrent calls to close() + if (isResourceClosed.compareAndSet(false, true)) { + if (client != null) { + client.shutdown(); + } + } + } +} diff --git a/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSInputFile.java b/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSInputFile.java index 4137744fbd94..b1ea6b04440b 100644 --- a/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSInputFile.java +++ b/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSInputFile.java @@ -20,6 +20,7 @@ package org.apache.iceberg.aliyun.oss; import com.aliyun.oss.OSS; +import org.apache.iceberg.aliyun.AliyunProperties; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.SeekableInputStream; @@ -28,12 +29,12 @@ public class OSSInputFile extends BaseOSSFile implements InputFile { private Long length = null; - OSSInputFile(OSS client, OSSURI uri) { - super(client, uri); + OSSInputFile(OSS client, OSSURI uri, AliyunProperties aliyunProperties) { + super(client, uri, aliyunProperties); } - OSSInputFile(OSS client, OSSURI uri, long length) { - super(client, uri); + OSSInputFile(OSS client, OSSURI uri, AliyunProperties aliyunProperties, long length) { + super(client, uri, aliyunProperties); ValidationException.check(length >= 0, "Invalid file length: %s", length); this.length = length; } diff --git a/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSOutputFile.java b/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSOutputFile.java index d5acde003390..2dc225cd76fa 100644 --- a/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSOutputFile.java +++ b/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSOutputFile.java @@ -52,6 +52,6 @@ public PositionOutputStream createOrOverwrite() { @Override public InputFile toInputFile() { - return new OSSInputFile(client(), uri()); + return new OSSInputFile(client(), uri(), aliyunProperties()); } } diff --git a/aliyun/src/test/java/org/apache/iceberg/aliyun/TestAliyunClientFactories.java b/aliyun/src/test/java/org/apache/iceberg/aliyun/TestAliyunClientFactories.java new file mode 100644 index 000000000000..675ccb6090df --- /dev/null +++ b/aliyun/src/test/java/org/apache/iceberg/aliyun/TestAliyunClientFactories.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aliyun; + +import com.aliyun.oss.OSS; +import java.util.Map; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.junit.Assert; +import org.junit.Test; + +public class TestAliyunClientFactories { + + @Test + public void testLoadDefault() { + Assert.assertEquals("Default client should be singleton", + AliyunClientFactories.defaultFactory(), AliyunClientFactories.defaultFactory()); + + Assert.assertTrue( + "Should load default when not configured", + AliyunClientFactories.load(Maps.newHashMap()) instanceof AliyunClientFactories.DefaultAliyunClientFactory); + } + + @Test + public void testLoadCustom() { + Map properties = Maps.newHashMap(); + properties.put(AliyunProperties.CLIENT_FACTORY, CustomFactory.class.getName()); + Assert.assertTrue( + "Should load custom class", + AliyunClientFactories.load(properties) instanceof CustomFactory); + } + + public static class CustomFactory implements AliyunClientFactory { + + AliyunProperties aliyunProperties; + + public CustomFactory() { + } + + @Override + public OSS newOSSClient() { + return null; + } + + @Override + public void initialize(Map properties) { + this.aliyunProperties = new AliyunProperties(properties); + } + + @Override + public AliyunProperties aliyunProperties() { + return aliyunProperties; + } + } +} diff --git a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/TestOSSFileIO.java b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/TestOSSFileIO.java new file mode 100644 index 000000000000..518d0b7710fc --- /dev/null +++ b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/TestOSSFileIO.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aliyun.oss; + +import com.aliyun.oss.OSS; +import com.aliyun.oss.OSSClient; +import com.aliyun.oss.OSSClientBuilder; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.io.ByteStreams; +import org.apache.iceberg.util.SerializableSupplier; +import org.apache.iceberg.util.SerializationUtil; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestOSSFileIO extends AliyunOSSTestBase { + private static final String OSS_IMPL_CLASS = OSSFileIO.class.getName(); + private final Configuration conf = new Configuration(); + private final Random random = ThreadLocalRandom.current(); + + private FileIO fileIO; + + @Before + public void beforeFile() { + fileIO = new OSSFileIO(ossClient()); + } + + @After + public void afterFile() { + fileIO.close(); + } + + @Test + public void testOutputFile() throws IOException { + String location = randomLocation(); + int dataSize = 1024 * 10; + byte[] data = randomData(dataSize); + + OutputFile out = fileIO().newOutputFile(location); + writeOSSData(out, data); + + OSSURI uri = new OSSURI(location); + Assert.assertTrue("OSS file should exist", ossClient().get().doesObjectExist(uri.bucket(), uri.key())); + Assert.assertEquals("Should have expected location", location, out.location()); + Assert.assertEquals("Should have expected length", dataSize, ossDataLength(uri)); + Assert.assertArrayEquals("Should have expected content", data, ossDataContent(uri, dataSize)); + } + + @Test + public void testInputFile() throws IOException { + String location = randomLocation(); + InputFile in = fileIO().newInputFile(location); + Assert.assertFalse("OSS file should not exist", in.exists()); + + int dataSize = 1024 * 10; + byte[] data = randomData(dataSize); + OutputFile out = fileIO().newOutputFile(location); + writeOSSData(out, data); + + Assert.assertTrue("OSS file should exist", in.exists()); + Assert.assertEquals("Should have expected location", location, in.location()); + Assert.assertEquals("Should have expected length", dataSize, in.getLength()); + Assert.assertArrayEquals("Should have expected content", data, inFileContent(in, dataSize)); + } + + @Test + public void testDeleteFile() throws IOException { + String location = randomLocation(); + int dataSize = 1024 * 10; + byte[] data = randomData(dataSize); + OutputFile out = fileIO().newOutputFile(location); + writeOSSData(out, data); + + InputFile in = fileIO().newInputFile(location); + Assert.assertTrue("OSS file should exist", in.exists()); + fileIO().deleteFile(in); + Assert.assertFalse("OSS file should not exist", fileIO().newInputFile(location).exists()); + } + + @Test + public void testLoadFileIO() { + FileIO file = CatalogUtil.loadFileIO(OSS_IMPL_CLASS, ImmutableMap.of(), conf); + Assert.assertTrue("Should be OSSFileIO", file instanceof OSSFileIO); + + byte[] data = SerializationUtil.serializeToBytes(file); + FileIO expectedFileIO = SerializationUtil.deserializeFromBytes(data); + Assert.assertTrue("The deserialized FileIO should be OSSFileIO", expectedFileIO instanceof OSSFileIO); + } + + @Test + public void serializeClient() throws URISyntaxException { + String endpoint = "iceberg-test-oss.aliyun.com"; + String accessKeyId = UUID.randomUUID().toString(); + String accessSecret = UUID.randomUUID().toString(); + SerializableSupplier pre = () -> new OSSClientBuilder().build(endpoint, accessKeyId, accessSecret); + + byte[] data = SerializationUtil.serializeToBytes(pre); + SerializableSupplier post = SerializationUtil.deserializeFromBytes(data); + + OSS client = post.get(); + Assert.assertTrue("Should be instance of oss client", client instanceof OSSClient); + + OSSClient oss = (OSSClient) client; + Assert.assertEquals("Should have expected endpoint", + new URI("http://" + endpoint), oss.getEndpoint()); + Assert.assertEquals("Should have expected access key", + accessKeyId, oss.getCredentialsProvider().getCredentials().getAccessKeyId()); + Assert.assertEquals("Should have expected secret key", + accessSecret, oss.getCredentialsProvider().getCredentials().getSecretAccessKey()); + } + + private FileIO fileIO() { + return fileIO; + } + + private String randomLocation() { + return location(String.format("%s.dat", UUID.randomUUID())); + } + + private byte[] randomData(int size) { + byte[] data = new byte[size]; + random.nextBytes(data); + return data; + } + + private long ossDataLength(OSSURI uri) { + return ossClient().get().getObject(uri.bucket(), uri.key()).getObjectMetadata().getContentLength(); + } + + private byte[] ossDataContent(OSSURI uri, int dataSize) throws IOException { + try (InputStream is = ossClient().get().getObject(uri.bucket(), uri.key()).getObjectContent()) { + byte[] actual = new byte[dataSize]; + ByteStreams.readFully(is, actual); + return actual; + } + } + + private void writeOSSData(OutputFile out, byte[] data) throws IOException { + try (OutputStream os = out.create(); InputStream is = new ByteArrayInputStream(data)) { + ByteStreams.copy(is, os); + } + } + + private byte[] inFileContent(InputFile in, int dataSize) throws IOException { + try (InputStream is = in.newStream()) { + byte[] actual = new byte[dataSize]; + ByteStreams.readFully(is, actual); + return actual; + } + } +} diff --git a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/TestOSSInputFile.java b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/TestOSSInputFile.java index 304d07404fd5..c939f5deb68d 100644 --- a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/TestOSSInputFile.java +++ b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/TestOSSInputFile.java @@ -26,6 +26,7 @@ import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.aliyun.AliyunProperties; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.SeekableInputStream; @@ -43,6 +44,7 @@ public class TestOSSInputFile extends AliyunOSSTestBase { private final OSS ossClient = ossClient().get(); private final OSS ossMock = mock(OSS.class, delegatesTo(ossClient)); + private final AliyunProperties aliyunProperties = new AliyunProperties(); private final Random random = ThreadLocalRandom.current(); @Test @@ -60,14 +62,15 @@ public void testReadFile() throws Exception { public void testOSSInputFile() { OSSURI uri = randomURI(); AssertHelpers.assertThrows("File length should not be negative", ValidationException.class, - "Invalid file length", () -> new OSSInputFile(ossClient().get(), uri, -1)); + "Invalid file length", + () -> new OSSInputFile(ossClient().get(), uri, aliyunProperties, -1)); } @Test public void testExists() { OSSURI uri = randomURI(); - InputFile inputFile = new OSSInputFile(ossMock, uri); + InputFile inputFile = new OSSInputFile(ossMock, uri, aliyunProperties); Assert.assertFalse("OSS file should not exist", inputFile.exists()); verify(ossMock, times(1)).getSimplifiedObjectMeta(uri.bucket(), uri.key()); reset(ossMock); @@ -100,7 +103,7 @@ public void testGetLength() { } private void readAndVerify(OSSURI uri, byte[] data) throws IOException { - InputFile inputFile = new OSSInputFile(ossClient().get(), uri); + InputFile inputFile = new OSSInputFile(ossClient().get(), uri, aliyunProperties); Assert.assertTrue("OSS file should exist", inputFile.exists()); Assert.assertEquals("Should have expected file length", data.length, inputFile.getLength()); @@ -114,9 +117,9 @@ private void readAndVerify(OSSURI uri, byte[] data) throws IOException { private void verifyLength(OSS ossClientMock, OSSURI uri, byte[] data, boolean isCache) { InputFile inputFile; if (isCache) { - inputFile = new OSSInputFile(ossClientMock, uri, data.length); + inputFile = new OSSInputFile(ossClientMock, uri, aliyunProperties, data.length); } else { - inputFile = new OSSInputFile(ossClientMock, uri); + inputFile = new OSSInputFile(ossClientMock, uri, aliyunProperties); } inputFile.getLength(); Assert.assertEquals("Should have expected file length", data.length, inputFile.getLength());