Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.aliyun;

import com.aliyun.oss.OSS;
import java.io.Serializable;
import java.util.Map;
import org.apache.iceberg.common.DynConstructors;

/**
* Interface to customize OSS clients used by Iceberg.
* A custom factory must have a no-arg constructor, and use {@link #initialize(Map)} to initialize the factory.
*/
public interface AliyunClientFactory extends Serializable {
/**
* Create an aliyun OSS client.
*
* @return oss client.
*/
OSS oss();

/**
* Initialize Aliyun client factory from catalog properties.
*
* @param properties catalog properties
*/
void initialize(Map<String, String> properties);

/**
* Returns an initialized {@link AliyunProperties}
*/
AliyunProperties aliyunProperties();

static AliyunClientFactory load(Map<String, String> properties) {
String impl = properties.getOrDefault("client.factory", DefaultAliyunClientFactory.class.getName());
return load(impl, properties);
}

/**
* Load an implemented {@link AliyunClientFactory} based on the class name, and initialize it.
*
* @param impl the class name.
* @param properties to initialize the factory.
* @return an initialized {@link AliyunClientFactory}.
*/
static AliyunClientFactory load(String impl, Map<String, String> properties) {
DynConstructors.Ctor<AliyunClientFactory> ctor;
try {
ctor = DynConstructors.builder(AliyunClientFactory.class).impl(impl).buildChecked();
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException(String.format(
"Cannot initialize AliyunClientFactory, missing no-arg constructor: %s", impl), e);
}

AliyunClientFactory factory;
try {
factory = ctor.newInstance();
} catch (ClassCastException e) {
throw new IllegalArgumentException(
String.format("Cannot initialize AliyunClientFactory, %s does not implement AliyunClientFactory.", impl), e);
}

factory.initialize(properties);
return factory;
}
}
165 changes: 165 additions & 0 deletions aliyun/src/main/java/org/apache/iceberg/aliyun/AliyunProperties.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.aliyun;

import java.io.Serializable;
import java.util.Map;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.util.PropertyUtil;

public class AliyunProperties implements Serializable {

/**
* The domain name used to access OSS. OSS uses HTTP Restful APIs to provide services. Different regions are accessed
* by using different endpoints. For the same region, access over the internal network or over the Internet also uses
* different endpoints. For more information, see:
* https://www.alibabacloud.com/help/doc-detail/31837.htm
*/
public static final String OSS_ENDPOINT = "oss.endpoint";

/**
* OSS uses an AccessKey pair, which includes an AccessKey ID and an AccessKey secret to implement symmetric
* encryption and verify the identity of a requester. The AccessKey ID is used to identify a user.
* <p>
* For more information about how to obtain an AccessKey pair, see:
* https://www.alibabacloud.com/help/doc-detail/53045.htm
*/
public static final String OSS_ACCESS_KEY_ID = "oss.access.key.id";

/**
* OSS uses an AccessKey pair, which includes an AccessKey ID and an AccessKey secret to implement symmetric
* encryption and verify the identity of a requester. The AccessKey secret is used to encrypt and verify the
* signature string.
* <p>
* For more information about how to obtain an AccessKey pair, see:
* https://www.alibabacloud.com/help/doc-detail/53045.htm
*/
public static final String OSS_ACCESS_KEY_SECRET = "oss.access.key.secret";

/**
* Number of threads to use for uploading parts to OSS (shared pool across all output streams),
* default to {@link Runtime#availableProcessors()}
*/
public static final String OSS_MULTIPART_UPLOAD_THREADS = "oss.multipart.num-threads";

/**
* The size of a single part for multipart update requests in bytes (default: 100KB) based on the OSS requirement,
* the part size must be at least 100KB. To ensure performance of the reader and writer, the part size must be less
* than 5GB.
* <p>
* For more details, please see: https://www.alibabacloud.com/help/doc-detail/31850.htm
*/
public static final String OSS_MULTIPART_SIZE = "oss.multipart.part-size-bytes";
public static final long OSS_MULTIPART_SIZE_DEFAULT = 100 * 1024;
public static final long OSS_MULTIPART_SIZE_MIN = 100 * 1024;
public static final long OSS_MULTIPART_SIZE_MAX = 5 * 1024 * 1024 * 1024L;

/**
* Location to put staging files for uploading to OSS, default to temp directory set in java.io.tmpdir.
*/
public static final String OSS_STAGING_DIRECTORY = "oss.staging-dir";

/**
* The threshold expressed as a object byte size at which to switch from uploading using a single put object request
* to uploading using multipart.
*/
public static final String OSS_MULTIPART_THRESHOLD_SIZE = "oss.multipart.threshold.size-bytes";
public static final long OSS_MULTIPART_THRESHOLD_SIZE_DEFAULT = 5 * 1024 * 1024 * 1024L;

private final String ossEndpoint;
private final String ossAccessKeyId;
private final String ossAccessKeySecret;
private final int ossMultipartUploadThreads;
private final long ossMultiPartSize;
private final String ossStagingDirectory;
private final long ossMultipartThresholdSize;

public AliyunProperties() {
this(ImmutableMap.of());
}

public AliyunProperties(Map<String, String> properties) {
Preconditions.checkNotNull(properties, "Aliyun properties should not be null");

// OSS endpoint, accessKeyId, accessKeySecret.
this.ossEndpoint = properties.get(OSS_ENDPOINT);
this.ossAccessKeyId = properties.get(OSS_ACCESS_KEY_ID);
this.ossAccessKeySecret = properties.get(OSS_ACCESS_KEY_SECRET);

// OSS multipart upload threads.
this.ossMultipartUploadThreads = PropertyUtil.propertyAsInt(properties, OSS_MULTIPART_UPLOAD_THREADS,
Runtime.getRuntime().availableProcessors());
Preconditions.checkArgument(ossMultipartUploadThreads > 0, "%s must be positive.", OSS_MULTIPART_THRESHOLD_SIZE);

// OOS multiple part size.
try {
this.ossMultiPartSize = PropertyUtil.propertyAsLong(properties,
OSS_MULTIPART_SIZE, OSS_MULTIPART_SIZE_DEFAULT);
} catch (NumberFormatException e) {
throw new IllegalArgumentException(
"Input malformed or exceed maximum multipart upload size 5GB: %s" + properties.get(OSS_MULTIPART_SIZE));
}
Preconditions.checkArgument(ossMultiPartSize >= OSS_MULTIPART_SIZE_MIN,
"Minimum multipart upload object size must be larger than 100KB.");
Preconditions.checkArgument(ossMultiPartSize <= OSS_MULTIPART_SIZE_MAX,
"Maximum multipart upload object size must be less than 5GB.");

// OSS staging directory.
this.ossStagingDirectory = PropertyUtil.propertyAsString(properties, OSS_STAGING_DIRECTORY,
System.getProperty("java.io.tmpdir"));

// OSS threshold to use multipart upload.
this.ossMultipartThresholdSize = PropertyUtil.propertyAsLong(properties,
OSS_MULTIPART_THRESHOLD_SIZE, OSS_MULTIPART_THRESHOLD_SIZE_DEFAULT);
Preconditions.checkArgument(ossMultipartThresholdSize > 0,
"%s must be positive, the recommend value is 5GB.", OSS_MULTIPART_THRESHOLD_SIZE);
Preconditions.checkArgument(ossMultipartThresholdSize >= ossMultiPartSize,
"%s must be not less than %s (value: %s)", OSS_MULTIPART_THRESHOLD_SIZE, OSS_MULTIPART_SIZE, ossMultiPartSize);
}

public String ossEndpoint() {
return ossEndpoint;
}

public String ossAccessKeyId() {
return ossAccessKeyId;
}

public String ossAccessKeySecret() {
return ossAccessKeySecret;
}

public int ossMultipartUploadThreads() {
return ossMultipartUploadThreads;
}

public long ossMultiPartSize() {
return ossMultiPartSize;
}

public String ossStagingDirectory() {
return ossStagingDirectory;
}

public long ossMultipartThresholdSize() {
return ossMultipartThresholdSize;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.aliyun;

import com.aliyun.oss.OSS;
import com.aliyun.oss.OSSClientBuilder;
import java.util.Map;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

public class DefaultAliyunClientFactory implements AliyunClientFactory {

private AliyunProperties aliyunProperties;

@Override
public OSS oss() {
Preconditions.checkNotNull(aliyunProperties,
"Cannot create aliyun oss client before initializing the AliyunClientFactory.");

return new OSSClientBuilder().build(
aliyunProperties.ossEndpoint(),
aliyunProperties.ossAccessKeyId(),
aliyunProperties.ossAccessKeySecret());
}

@Override
public void initialize(Map<String, String> properties) {
this.aliyunProperties = new AliyunProperties(properties);
}

@Override
public AliyunProperties aliyunProperties() {
return aliyunProperties;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.aliyun.oss;

import com.aliyun.oss.OSS;
import org.apache.iceberg.aliyun.AliyunProperties;

abstract class BaseOSSFile {
private final OSS client;
private final OSSURI uri;
private final AliyunProperties aliyunProperties;

BaseOSSFile(OSS client, OSSURI uri, AliyunProperties aliyunProperties) {
this.client = client;
this.uri = uri;
this.aliyunProperties = aliyunProperties;
}

public String location() {
return uri.location();
}

public OSS client() {
return client;
}

public OSSURI uri() {
return uri;
}

AliyunProperties aliyunProperties() {
return aliyunProperties;
}

boolean doesObjectExists() {
return client.doesObjectExist(uri.bucket(), uri.key());
}

@Override
public String toString() {
return uri.toString();
}
}
Loading