Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.aliyun;

import com.aliyun.oss.OSS;
import com.aliyun.oss.OSSClientBuilder;
import java.util.Map;
import org.apache.iceberg.common.DynConstructors;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

public class AliyunClientFactories {

private static final AliyunClientFactory ALIYUN_CLIENT_FACTORY_DEFAULT = new DefaultAliyunClientFactory();

private AliyunClientFactories() {
}

public static AliyunClientFactory defaultFactory() {
return ALIYUN_CLIENT_FACTORY_DEFAULT;
}

public static AliyunClientFactory load(Map<String, String> properties) {
if (properties.containsKey(AliyunProperties.CLIENT_FACTORY)) {
return load(properties.get(AliyunProperties.CLIENT_FACTORY), properties);
} else {
return defaultFactory();
}
}

/**
* Load an implemented {@link AliyunClientFactory} based on the class name, and initialize it.
*
* @param impl the class name.
* @param properties to initialize the factory.
* @return an initialized {@link AliyunClientFactory}.
*/
private static AliyunClientFactory load(String impl, Map<String, String> properties) {
DynConstructors.Ctor<AliyunClientFactory> ctor;
try {
ctor = DynConstructors.builder(AliyunClientFactory.class).impl(impl).buildChecked();
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException(String.format(
"Cannot initialize AliyunClientFactory, missing no-arg constructor: %s", impl), e);
}

AliyunClientFactory factory;
try {
factory = ctor.newInstance();
} catch (ClassCastException e) {
throw new IllegalArgumentException(
String.format("Cannot initialize AliyunClientFactory, %s does not implement AliyunClientFactory.", impl), e);
}

factory.initialize(properties);
return factory;
}

static class DefaultAliyunClientFactory implements AliyunClientFactory {
private AliyunProperties aliyunProperties;

DefaultAliyunClientFactory() {
}

@Override
public OSS newOSSClient() {
Preconditions.checkNotNull(
aliyunProperties, "Cannot create aliyun oss client before initializing the AliyunClientFactory.");

return new OSSClientBuilder().build(
aliyunProperties.ossEndpoint(), aliyunProperties.accessKeyId(), aliyunProperties.accessKeySecret());
}

@Override
public void initialize(Map<String, String> properties) {
this.aliyunProperties = new AliyunProperties(properties);
}

@Override
public AliyunProperties aliyunProperties() {
return aliyunProperties;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.aliyun;

import com.aliyun.oss.OSS;
import java.io.Serializable;
import java.util.Map;

public interface AliyunClientFactory extends Serializable {

/**
* Create an aliyun OSS client.
*
* @return oss client.
*/
OSS newOSSClient();

/**
* Initialize Aliyun client factory from catalog properties.
*
* @param properties catalog properties
*/
void initialize(Map<String, String> properties);

/**
* Returns an initialized {@link AliyunProperties}
*/
AliyunProperties aliyunProperties();
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,76 @@
import org.apache.iceberg.util.PropertyUtil;

public class AliyunProperties implements Serializable {
/**
* The domain name used to access OSS. OSS uses HTTP Restful APIs to provide services. Different regions are accessed
* by using different endpoints. For the same region, access over the internal network or over the Internet also uses
* different endpoints. For more information, see:
* https://www.alibabacloud.com/help/doc-detail/31837.htm
*/
public static final String OSS_ENDPOINT = "oss.endpoint";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We generally try to avoid nesting in places that have a small set of specific properties. Is it possible to make these shorter and more standard?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we will introduce the aliyun catalog services named DLF in future, the DLF services also has its endpoint. we need to prefix oss. and dlf. to distingush it's an oss endpoint or dlf endpoint. That's why we use oss.endpoint here.


/**
* Aliyun uses an AccessKey pair, which includes an AccessKey ID and an AccessKey secret to implement symmetric
* encryption and verify the identity of a requester. The AccessKey ID is used to identify a user.
* <p>
* For more information about how to obtain an AccessKey pair, see:
* https://www.alibabacloud.com/help/doc-detail/53045.htm
*/
public static final String ACCESS_KEY_ID = "access.key.id";

/**
* Aliyun uses an AccessKey pair, which includes an AccessKey ID and an AccessKey secret to implement symmetric
* encryption and verify the identity of a requester. The AccessKey secret is used to encrypt and verify the
* signature string.
* <p>
* For more information about how to obtain an AccessKey pair, see:
* https://www.alibabacloud.com/help/doc-detail/53045.htm
*/
public static final String ACCESS_KEY_SECRET = "access.key.secret";

/**
* The implementation class of {@link AliyunClientFactory} to customize Aliyun client configurations.
* If set, all Aliyun clients will be initialized by the specified factory.
* If not set, {@link AliyunClientFactories#defaultFactory()} is used as default factory.
*/
public static final String CLIENT_FACTORY = "client.factory-impl";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice if they were the same, but I like that this conforms to the -impl convention that we use for dynamically loaded classes elsewhere.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can make another PR to line up the behavior of aws part as "client.factory-impl"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, that works for me as well


/**
* Location to put staging files for uploading to OSS, defaults to the directory value of java.io.tmpdir.
*/
public static final String OSS_STAGING_DIRECTORY = "oss.staging-dir";

private final String ossEndpoint;
private final String accessKeyId;
private final String accessKeySecret;
private final String ossStagingDirectory;

public AliyunProperties() {
this(ImmutableMap.of());
}

public AliyunProperties(Map<String, String> properties) {
// OSS endpoint, accessKeyId, accessKeySecret.
this.ossEndpoint = properties.get(OSS_ENDPOINT);
this.accessKeyId = properties.get(ACCESS_KEY_ID);
this.accessKeySecret = properties.get(ACCESS_KEY_SECRET);

this.ossStagingDirectory = PropertyUtil.propertyAsString(properties, OSS_STAGING_DIRECTORY,
System.getProperty("java.io.tmpdir"));
}

public String ossEndpoint() {
return ossEndpoint;
}

public String accessKeyId() {
return accessKeyId;
}

public String accessKeySecret() {
return accessKeySecret;
}

public String ossStagingDirectory() {
return ossStagingDirectory;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,6 @@ abstract class BaseOSSFile {
private AliyunProperties aliyunProperties;
private SimplifiedObjectMeta metadata;

BaseOSSFile(OSS client, OSSURI uri) {
this(client, uri, new AliyunProperties());
}

BaseOSSFile(OSS client, OSSURI uri, AliyunProperties aliyunProperties) {
this.client = client;
this.uri = uri;
Expand Down
106 changes: 106 additions & 0 deletions aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSFileIO.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.aliyun.oss;

import com.aliyun.oss.OSS;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.iceberg.aliyun.AliyunClientFactories;
import org.apache.iceberg.aliyun.AliyunClientFactory;
import org.apache.iceberg.aliyun.AliyunProperties;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.util.SerializableSupplier;

/**
* FileIO implementation backed by OSS.
* <p>
* Locations used must follow the conventions for OSS URIs (e.g. oss://bucket/path...).
* URIs with scheme https are also treated as oss file paths.
* Using this FileIO with other schemes with result in {@link org.apache.iceberg.exceptions.ValidationException}
*/
public class OSSFileIO implements FileIO {

private SerializableSupplier<OSS> oss;
private AliyunProperties aliyunProperties;
private transient OSS client;
private final AtomicBoolean isResourceClosed = new AtomicBoolean(false);

/**
* No-arg constructor to load the FileIO dynamically.
* <p>
* All fields are initialized by calling {@link OSSFileIO#initialize(Map)} later.
*/
public OSSFileIO() {
}

/**
* Constructor with custom oss supplier and default aliyun properties.
* <p>
* Calling {@link OSSFileIO#initialize(Map)} will overwrite information set in this constructor.
*
* @param oss oss supplier
*/
public OSSFileIO(SerializableSupplier<OSS> oss) {
this.oss = oss;
this.aliyunProperties = new AliyunProperties();
}

@Override
public InputFile newInputFile(String path) {
return new OSSInputFile(client(), new OSSURI(path), aliyunProperties);
}

@Override
public OutputFile newOutputFile(String path) {
return new OSSOutputFile(client(), new OSSURI(path), aliyunProperties);
}

@Override
public void deleteFile(String path) {
OSSURI location = new OSSURI(path);
client().deleteObject(location.bucket(), location.key());
}

private OSS client() {
if (client == null) {
client = oss.get();
}
return client;
}

@Override
public void initialize(Map<String, String> properties) {
AliyunClientFactory factory = AliyunClientFactories.load(properties);
this.aliyunProperties = factory.aliyunProperties();
this.oss = factory::newOSSClient;
}

@Override
public void close() {
// handles concurrent calls to close()
if (isResourceClosed.compareAndSet(false, true)) {
if (client != null) {
client.shutdown();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iceberg.aliyun.oss;

import com.aliyun.oss.OSS;
import org.apache.iceberg.aliyun.AliyunProperties;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.SeekableInputStream;
Expand All @@ -28,12 +29,12 @@ public class OSSInputFile extends BaseOSSFile implements InputFile {

private Long length = null;

OSSInputFile(OSS client, OSSURI uri) {
super(client, uri);
OSSInputFile(OSS client, OSSURI uri, AliyunProperties aliyunProperties) {
super(client, uri, aliyunProperties);
}

OSSInputFile(OSS client, OSSURI uri, long length) {
super(client, uri);
OSSInputFile(OSS client, OSSURI uri, AliyunProperties aliyunProperties, long length) {
super(client, uri, aliyunProperties);
ValidationException.check(length >= 0, "Invalid file length: %s", length);
this.length = length;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,6 @@ public PositionOutputStream createOrOverwrite() {

@Override
public InputFile toInputFile() {
return new OSSInputFile(client(), uri());
return new OSSInputFile(client(), uri(), aliyunProperties());
}
}
Loading