diff --git a/aliyun/src/main/java/org/apache/iceberg/aliyun/AliyunClientFactory.java b/aliyun/src/main/java/org/apache/iceberg/aliyun/AliyunClientFactory.java new file mode 100644 index 000000000000..f39ce71b38c7 --- /dev/null +++ b/aliyun/src/main/java/org/apache/iceberg/aliyun/AliyunClientFactory.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aliyun; + +import com.aliyun.oss.OSS; +import java.io.Serializable; +import java.util.Map; +import org.apache.iceberg.common.DynConstructors; + +/** + * Interface to customize OSS clients used by Iceberg. + * A custom factory must have a no-arg constructor, and use {@link #initialize(Map)} to initialize the factory. + */ +public interface AliyunClientFactory extends Serializable { + /** + * Create an aliyun OSS client. + * + * @return oss client. + */ + OSS oss(); + + /** + * Initialize Aliyun client factory from catalog properties. + * + * @param properties catalog properties + */ + void initialize(Map properties); + + /** + * Returns an initialized {@link AliyunProperties} + */ + AliyunProperties aliyunProperties(); + + static AliyunClientFactory load(Map properties) { + String impl = properties.getOrDefault("client.factory", DefaultAliyunClientFactory.class.getName()); + return load(impl, properties); + } + + /** + * Load an implemented {@link AliyunClientFactory} based on the class name, and initialize it. + * + * @param impl the class name. + * @param properties to initialize the factory. + * @return an initialized {@link AliyunClientFactory}. + */ + static AliyunClientFactory load(String impl, Map properties) { + DynConstructors.Ctor ctor; + try { + ctor = DynConstructors.builder(AliyunClientFactory.class).impl(impl).buildChecked(); + } catch (NoSuchMethodException e) { + throw new IllegalArgumentException(String.format( + "Cannot initialize AliyunClientFactory, missing no-arg constructor: %s", impl), e); + } + + AliyunClientFactory factory; + try { + factory = ctor.newInstance(); + } catch (ClassCastException e) { + throw new IllegalArgumentException( + String.format("Cannot initialize AliyunClientFactory, %s does not implement AliyunClientFactory.", impl), e); + } + + factory.initialize(properties); + return factory; + } +} diff --git a/aliyun/src/main/java/org/apache/iceberg/aliyun/AliyunProperties.java b/aliyun/src/main/java/org/apache/iceberg/aliyun/AliyunProperties.java new file mode 100644 index 000000000000..959b402b9cc8 --- /dev/null +++ b/aliyun/src/main/java/org/apache/iceberg/aliyun/AliyunProperties.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aliyun; + +import java.io.Serializable; +import java.util.Map; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.util.PropertyUtil; + +public class AliyunProperties implements Serializable { + + /** + * The domain name used to access OSS. OSS uses HTTP Restful APIs to provide services. Different regions are accessed + * by using different endpoints. For the same region, access over the internal network or over the Internet also uses + * different endpoints. For more information, see: + * https://www.alibabacloud.com/help/doc-detail/31837.htm + */ + public static final String OSS_ENDPOINT = "oss.endpoint"; + + /** + * OSS uses an AccessKey pair, which includes an AccessKey ID and an AccessKey secret to implement symmetric + * encryption and verify the identity of a requester. The AccessKey ID is used to identify a user. + *

+ * For more information about how to obtain an AccessKey pair, see: + * https://www.alibabacloud.com/help/doc-detail/53045.htm + */ + public static final String OSS_ACCESS_KEY_ID = "oss.access.key.id"; + + /** + * OSS uses an AccessKey pair, which includes an AccessKey ID and an AccessKey secret to implement symmetric + * encryption and verify the identity of a requester. The AccessKey secret is used to encrypt and verify the + * signature string. + *

+ * For more information about how to obtain an AccessKey pair, see: + * https://www.alibabacloud.com/help/doc-detail/53045.htm + */ + public static final String OSS_ACCESS_KEY_SECRET = "oss.access.key.secret"; + + /** + * Number of threads to use for uploading parts to OSS (shared pool across all output streams), + * default to {@link Runtime#availableProcessors()} + */ + public static final String OSS_MULTIPART_UPLOAD_THREADS = "oss.multipart.num-threads"; + + /** + * The size of a single part for multipart update requests in bytes (default: 100KB) based on the OSS requirement, + * the part size must be at least 100KB. To ensure performance of the reader and writer, the part size must be less + * than 5GB. + *

+ * For more details, please see: https://www.alibabacloud.com/help/doc-detail/31850.htm + */ + public static final String OSS_MULTIPART_SIZE = "oss.multipart.part-size-bytes"; + public static final long OSS_MULTIPART_SIZE_DEFAULT = 100 * 1024; + public static final long OSS_MULTIPART_SIZE_MIN = 100 * 1024; + public static final long OSS_MULTIPART_SIZE_MAX = 5 * 1024 * 1024 * 1024L; + + /** + * Location to put staging files for uploading to OSS, default to temp directory set in java.io.tmpdir. + */ + public static final String OSS_STAGING_DIRECTORY = "oss.staging-dir"; + + /** + * The threshold expressed as a object byte size at which to switch from uploading using a single put object request + * to uploading using multipart. + */ + public static final String OSS_MULTIPART_THRESHOLD_SIZE = "oss.multipart.threshold.size-bytes"; + public static final long OSS_MULTIPART_THRESHOLD_SIZE_DEFAULT = 5 * 1024 * 1024 * 1024L; + + private final String ossEndpoint; + private final String ossAccessKeyId; + private final String ossAccessKeySecret; + private final int ossMultipartUploadThreads; + private final long ossMultiPartSize; + private final String ossStagingDirectory; + private final long ossMultipartThresholdSize; + + public AliyunProperties() { + this(ImmutableMap.of()); + } + + public AliyunProperties(Map properties) { + Preconditions.checkNotNull(properties, "Aliyun properties should not be null"); + + // OSS endpoint, accessKeyId, accessKeySecret. + this.ossEndpoint = properties.get(OSS_ENDPOINT); + this.ossAccessKeyId = properties.get(OSS_ACCESS_KEY_ID); + this.ossAccessKeySecret = properties.get(OSS_ACCESS_KEY_SECRET); + + // OSS multipart upload threads. + this.ossMultipartUploadThreads = PropertyUtil.propertyAsInt(properties, OSS_MULTIPART_UPLOAD_THREADS, + Runtime.getRuntime().availableProcessors()); + Preconditions.checkArgument(ossMultipartUploadThreads > 0, "%s must be positive.", OSS_MULTIPART_THRESHOLD_SIZE); + + // OOS multiple part size. + try { + this.ossMultiPartSize = PropertyUtil.propertyAsLong(properties, + OSS_MULTIPART_SIZE, OSS_MULTIPART_SIZE_DEFAULT); + } catch (NumberFormatException e) { + throw new IllegalArgumentException( + "Input malformed or exceed maximum multipart upload size 5GB: %s" + properties.get(OSS_MULTIPART_SIZE)); + } + Preconditions.checkArgument(ossMultiPartSize >= OSS_MULTIPART_SIZE_MIN, + "Minimum multipart upload object size must be larger than 100KB."); + Preconditions.checkArgument(ossMultiPartSize <= OSS_MULTIPART_SIZE_MAX, + "Maximum multipart upload object size must be less than 5GB."); + + // OSS staging directory. + this.ossStagingDirectory = PropertyUtil.propertyAsString(properties, OSS_STAGING_DIRECTORY, + System.getProperty("java.io.tmpdir")); + + // OSS threshold to use multipart upload. + this.ossMultipartThresholdSize = PropertyUtil.propertyAsLong(properties, + OSS_MULTIPART_THRESHOLD_SIZE, OSS_MULTIPART_THRESHOLD_SIZE_DEFAULT); + Preconditions.checkArgument(ossMultipartThresholdSize > 0, + "%s must be positive, the recommend value is 5GB.", OSS_MULTIPART_THRESHOLD_SIZE); + Preconditions.checkArgument(ossMultipartThresholdSize >= ossMultiPartSize, + "%s must be not less than %s (value: %s)", OSS_MULTIPART_THRESHOLD_SIZE, OSS_MULTIPART_SIZE, ossMultiPartSize); + } + + public String ossEndpoint() { + return ossEndpoint; + } + + public String ossAccessKeyId() { + return ossAccessKeyId; + } + + public String ossAccessKeySecret() { + return ossAccessKeySecret; + } + + public int ossMultipartUploadThreads() { + return ossMultipartUploadThreads; + } + + public long ossMultiPartSize() { + return ossMultiPartSize; + } + + public String ossStagingDirectory() { + return ossStagingDirectory; + } + + public long ossMultipartThresholdSize() { + return ossMultipartThresholdSize; + } +} diff --git a/aliyun/src/main/java/org/apache/iceberg/aliyun/DefaultAliyunClientFactory.java b/aliyun/src/main/java/org/apache/iceberg/aliyun/DefaultAliyunClientFactory.java new file mode 100644 index 000000000000..78c24359e2dc --- /dev/null +++ b/aliyun/src/main/java/org/apache/iceberg/aliyun/DefaultAliyunClientFactory.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aliyun; + +import com.aliyun.oss.OSS; +import com.aliyun.oss.OSSClientBuilder; +import java.util.Map; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +public class DefaultAliyunClientFactory implements AliyunClientFactory { + + private AliyunProperties aliyunProperties; + + @Override + public OSS oss() { + Preconditions.checkNotNull(aliyunProperties, + "Cannot create aliyun oss client before initializing the AliyunClientFactory."); + + return new OSSClientBuilder().build( + aliyunProperties.ossEndpoint(), + aliyunProperties.ossAccessKeyId(), + aliyunProperties.ossAccessKeySecret()); + } + + @Override + public void initialize(Map properties) { + this.aliyunProperties = new AliyunProperties(properties); + } + + @Override + public AliyunProperties aliyunProperties() { + return aliyunProperties; + } +} diff --git a/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/BaseOSSFile.java b/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/BaseOSSFile.java new file mode 100644 index 000000000000..90825b7166ff --- /dev/null +++ b/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/BaseOSSFile.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aliyun.oss; + +import com.aliyun.oss.OSS; +import org.apache.iceberg.aliyun.AliyunProperties; + +abstract class BaseOSSFile { + private final OSS client; + private final OSSURI uri; + private final AliyunProperties aliyunProperties; + + BaseOSSFile(OSS client, OSSURI uri, AliyunProperties aliyunProperties) { + this.client = client; + this.uri = uri; + this.aliyunProperties = aliyunProperties; + } + + public String location() { + return uri.location(); + } + + public OSS client() { + return client; + } + + public OSSURI uri() { + return uri; + } + + AliyunProperties aliyunProperties() { + return aliyunProperties; + } + + boolean doesObjectExists() { + return client.doesObjectExist(uri.bucket(), uri.key()); + } + + @Override + public String toString() { + return uri.toString(); + } +} diff --git a/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSFileIO.java b/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSFileIO.java new file mode 100644 index 000000000000..767a5cd2a58f --- /dev/null +++ b/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSFileIO.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aliyun.oss; + +import com.aliyun.oss.OSS; +import java.util.Map; +import org.apache.iceberg.aliyun.AliyunClientFactory; +import org.apache.iceberg.aliyun.AliyunProperties; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.util.SerializableSupplier; + +/** + * FileIO implementation backend by OSS. + *

+ * Locations used must follow the conventions for OSS URIs (e.g. oss://bucket/path...). + * URIs with schema https are also treated as oss file paths. + * Using this FileIO with other schemas with result in {@link org.apache.iceberg.exceptions.ValidationException} + */ +public class OSSFileIO implements FileIO { + + private SerializableSupplier oss; + private AliyunProperties aliyunProperties; + private transient OSS client; + + /** + * No-arg constructor to load the FileIO dynamically. + *

+ * All fields are initialized by calling {@link OSSFileIO#initialize(Map)} later. + */ + public OSSFileIO() { + } + + /** + * Constructor with custom oss supplier and default aliyun properties. + *

+ * Calling {@link OSSFileIO#initialize(Map)} will overwrite information set in this constructor. + * + * @param oss oss supplier + */ + public OSSFileIO(SerializableSupplier oss) { + this.oss = oss; + this.aliyunProperties = new AliyunProperties(); + } + + @Override + public InputFile newInputFile(String path) { + return new OSSInputFile(client(), new OSSURI(path), aliyunProperties); + } + + @Override + public OutputFile newOutputFile(String path) { + return new OSSOutputFile(client(), new OSSURI(path), aliyunProperties); + } + + @Override + public void deleteFile(String path) { + OSSURI location = new OSSURI(path); + client().deleteObject(location.bucket(), location.key()); + } + + private OSS client() { + if (client == null) { + client = oss.get(); + } + return client; + } + + @Override + public void initialize(Map properties) { + AliyunClientFactory factory = AliyunClientFactory.load(properties); + this.aliyunProperties = factory.aliyunProperties(); + this.oss = factory::oss; + } +} diff --git a/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSInputFile.java b/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSInputFile.java new file mode 100644 index 000000000000..3d9a65d49c4f --- /dev/null +++ b/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSInputFile.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aliyun.oss; + +import com.aliyun.oss.OSS; +import org.apache.iceberg.aliyun.AliyunProperties; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.SeekableInputStream; + +class OSSInputFile extends BaseOSSFile implements InputFile { + + OSSInputFile(OSS client, OSSURI uri, AliyunProperties aliyunProperties) { + super(client, uri, aliyunProperties); + } + + @Override + public long getLength() { + return client().getSimplifiedObjectMeta(uri().bucket(), uri().key()).getSize(); + } + + @Override + public SeekableInputStream newStream() { + return new OSSInputStream(client(), uri()); + } + + @Override + public boolean exists() { + return doesObjectExists(); + } +} diff --git a/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSInputStream.java b/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSInputStream.java new file mode 100644 index 000000000000..38fa0d97f842 --- /dev/null +++ b/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSInputStream.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aliyun.oss; + +import com.aliyun.oss.OSS; +import com.aliyun.oss.model.GetObjectRequest; +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import org.apache.iceberg.io.SeekableInputStream; +import org.apache.iceberg.relocated.com.google.common.base.Joiner; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.io.ByteStreams; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class OSSInputStream extends SeekableInputStream { + private static final Logger LOG = LoggerFactory.getLogger(OSSInputStream.class); + private static final int SKIP_SIZE = 1024 * 1024; + + private final StackTraceElement[] createStack; + private final OSS client; + private final OSSURI uri; + + private InputStream stream = null; + private long pos = 0; + private long next = 0; + private boolean closed = false; + + OSSInputStream(OSS client, OSSURI uri) { + this.client = client; + this.uri = uri; + + this.createStack = Thread.currentThread().getStackTrace(); + } + + @Override + public long getPos() { + return next; + } + + @Override + public void seek(long newPos) { + Preconditions.checkState(!closed, "already closed"); + Preconditions.checkArgument(newPos >= 0, "position is negative: %s", newPos); + + // this allows a seek beyond the end of the stream but the next read will fail + next = newPos; + } + + @Override + public int read() throws IOException { + Preconditions.checkState(!closed, "Cannot read: already closed"); + positionStream(); + + pos += 1; + next += 1; + + return stream.read(); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + Preconditions.checkState(!closed, "Cannot read: already closed"); + positionStream(); + + int bytesRead = stream.read(b, off, len); + pos += bytesRead; + next += bytesRead; + + return bytesRead; + } + + @Override + public void close() throws IOException { + super.close(); + closed = true; + closeStream(); + } + + private void positionStream() throws IOException { + if ((stream != null) && (next == pos)) { + // already at specified position. + return; + } + + if ((stream != null) && (next > pos)) { + // seeking forwards + long skip = next - pos; + if (skip <= Math.max(stream.available(), SKIP_SIZE)) { + // already buffered or seek is small enough + LOG.debug("Read-through seek for {} to offset {}", uri, next); + try { + ByteStreams.skipFully(stream, skip); + pos = next; + return; + } catch (IOException ignored) { + // will retry by re-opening the stream. + } + } + } + + // close the stream and open at desired position. + LOG.debug("Seek with new stream for {} to offset {}", uri, next); + pos = next; + openStream(); + } + + private void openStream() throws IOException { + closeStream(); + + GetObjectRequest request = new GetObjectRequest(uri.bucket(), uri.key()) + .withRange(pos, -1); + stream = client.getObject(request).getObjectContent(); + } + + private void closeStream() throws IOException { + if (stream != null) { + stream.close(); + stream = null; + } + } + + @SuppressWarnings("checkstyle:NoFinalizer") + @Override + protected void finalize() throws Throwable { + super.finalize(); + if (!closed) { + close(); + String trace = Joiner.on("\n\t").join(Arrays.copyOfRange(createStack, 1, createStack.length)); + LOG.warn("Unclosed input stream created by: \n\t{}", trace); + } + } +} diff --git a/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSOutputFile.java b/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSOutputFile.java new file mode 100644 index 000000000000..916d1ea9b552 --- /dev/null +++ b/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSOutputFile.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aliyun.oss; + +import com.aliyun.oss.OSS; +import java.io.IOException; +import java.io.UncheckedIOException; +import org.apache.iceberg.aliyun.AliyunProperties; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.PositionOutputStream; + +class OSSOutputFile extends BaseOSSFile implements OutputFile { + OSSOutputFile(OSS client, OSSURI uri, AliyunProperties aliyunProperties) { + super(client, uri, aliyunProperties); + } + + @Override + public PositionOutputStream create() { + if (!doesObjectExists()) { + return createOrOverwrite(); + } else { + throw new AlreadyExistsException("Location already exists: %s", uri()); + } + } + + @Override + public PositionOutputStream createOrOverwrite() { + try { + return new OSSOutputStream(client(), uri(), aliyunProperties()); + } catch (IOException e) { + throw new UncheckedIOException("Failed to create output stream for location: " + uri(), e); + } + } + + @Override + public InputFile toInputFile() { + return new OSSInputFile(client(), uri(), aliyunProperties()); + } +} diff --git a/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSOutputStream.java b/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSOutputStream.java new file mode 100644 index 000000000000..2a9ed9bd0697 --- /dev/null +++ b/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSOutputStream.java @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aliyun.oss; + +import com.aliyun.oss.OSS; +import com.aliyun.oss.model.AbortMultipartUploadRequest; +import com.aliyun.oss.model.CompleteMultipartUploadRequest; +import com.aliyun.oss.model.InitiateMultipartUploadRequest; +import com.aliyun.oss.model.InitiateMultipartUploadResult; +import com.aliyun.oss.model.ObjectMetadata; +import com.aliyun.oss.model.PartETag; +import com.aliyun.oss.model.PutObjectRequest; +import com.aliyun.oss.model.UploadPartRequest; +import com.aliyun.oss.model.UploadPartResult; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.SequenceInputStream; +import java.io.UncheckedIOException; +import java.nio.file.Files; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.stream.Collectors; +import org.apache.iceberg.aliyun.AliyunProperties; +import org.apache.iceberg.io.PositionOutputStream; +import org.apache.iceberg.relocated.com.google.common.base.Joiner; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.base.Predicates; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.io.CountingOutputStream; +import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors; +import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.iceberg.util.Tasks; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class OSSOutputStream extends PositionOutputStream { + private static final Logger LOG = LoggerFactory.getLogger(OSSOutputStream.class); + + private static volatile ExecutorService executorService; + + private final StackTraceElement[] createStack; + private final OSS client; + private final OSSURI uri; + private final AliyunProperties aliyunProperties; + + // For multipart uploading. + private final long multiPartSize; + private final long multiPartThresholdSize; + private final File stagingDirectory; + private final List stagingFiles = Lists.newArrayList(); + private final Map> multiPartMap = Maps.newHashMap(); + private String multipartUploadId = null; + private File currentStagingFile; + private CountingOutputStream stream; + + private long pos = 0; + private boolean closed = false; + + OSSOutputStream(OSS client, OSSURI uri, AliyunProperties aliyunProperties) throws IOException { + this.client = client; + this.uri = uri; + this.aliyunProperties = aliyunProperties; + + this.createStack = Thread.currentThread().getStackTrace(); + + this.multiPartSize = aliyunProperties.ossMultiPartSize(); + this.multiPartThresholdSize = aliyunProperties.ossMultipartThresholdSize(); + this.stagingDirectory = new File(aliyunProperties.ossStagingDirectory()); + + // Initialize the executor service lazily. + initializeExecutorService(); + + newStream(); + } + + private void initializeExecutorService() { + if (executorService == null) { + synchronized (OSSOutputStream.class) { + if (executorService == null) { + executorService = MoreExecutors.getExitingExecutorService( + (ThreadPoolExecutor) Executors.newFixedThreadPool( + aliyunProperties.ossMultipartUploadThreads(), + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("iceberg-oss-file-io-upload-%d") + .build())); + } + } + } + } + + @Override + public long getPos() { + return pos; + } + + @Override + public void flush() throws IOException { + Preconditions.checkState(!closed, "Already closed."); + + stream.flush(); + } + + @Override + public void write(int b) throws IOException { + Preconditions.checkState(!closed, "Already closed."); + + if (stream.getCount() >= multiPartSize) { + newStream(); + uploadParts(); + } + + stream.write(b); + pos += 1; + + // switch to multipart upload + if (multipartUploadId == null && pos >= multiPartThresholdSize) { + initializeMultiPartUpload(); + uploadParts(); + } + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + Preconditions.checkState(!closed, "Already closed."); + + int remaining = len; + int relativeOffset = off; + + // Write the remainder of the part size to the staging file + // and continue to write new staging files if the write is + // larger than the part size. + while (stream.getCount() + remaining > multiPartSize) { + int writeSize = (int) (multiPartSize - stream.getCount()); + + stream.write(b, relativeOffset, writeSize); + remaining -= writeSize; + relativeOffset += writeSize; + + newStream(); + uploadParts(); + } + + stream.write(b, relativeOffset, remaining); + pos += len; + + // switch to multipart upload + if (multipartUploadId == null && pos >= multiPartThresholdSize) { + initializeMultiPartUpload(); + uploadParts(); + } + } + + @Override + public void close() throws IOException { + if (closed) { + return; + } + + super.close(); + closed = true; + + try { + stream.close(); + completeUploads(); + } finally { + cleanUpStagingFiles(); + } + } + + private void newStream() throws IOException { + if (stream != null) { + stream.close(); + } + + currentStagingFile = File.createTempFile("oss-file-io-", ".tmp", stagingDirectory); + currentStagingFile.deleteOnExit(); + stagingFiles.add(currentStagingFile); + + stream = new CountingOutputStream(new BufferedOutputStream(new FileOutputStream(currentStagingFile))); + } + + private void initializeMultiPartUpload() { + InitiateMultipartUploadRequest request = new InitiateMultipartUploadRequest(uri.bucket(), uri.key()); + InitiateMultipartUploadResult result = client.initiateMultipartUpload(request); + multipartUploadId = result.getUploadId(); + } + + private void uploadParts() { + // exit if multipart has not been initiated + if (multipartUploadId == null) { + return; + } + + stagingFiles.stream() + // do not upload the file currently being written + .filter(f -> closed || !f.equals(currentStagingFile)) + // do not upload any files that have already been processed + .filter(Predicates.not(multiPartMap::containsKey)) + .forEach(f -> { + UploadPartRequest uploadRequest = new UploadPartRequest(uri.bucket(), uri.key(), + multipartUploadId, stagingFiles.indexOf(f) + 1, uncheckedInputStream(f), f.length()); + + CompletableFuture future = CompletableFuture.supplyAsync( + () -> client.uploadPart(uploadRequest), + executorService + ).whenComplete((result, thrown) -> { + try { + Files.deleteIfExists(f.toPath()); + } catch (IOException e) { + LOG.warn("Failed to delete staging file: {}", f, e); + } + + if (thrown != null) { + LOG.error("Failed to upload part: {}", f, thrown); + abortUpload(); + } + }); + + multiPartMap.put(f, future); + }); + } + + private void abortUpload() { + if (multipartUploadId != null) { + try { + AbortMultipartUploadRequest request = new AbortMultipartUploadRequest(uri.bucket(), uri.key(), + multipartUploadId); + client.abortMultipartUpload(request); + } finally { + cleanUpStagingFiles(); + } + } + } + + private void cleanUpStagingFiles() { + Tasks.foreach(stagingFiles) + .suppressFailureWhenFinished() + .onFailure((file, thrown) -> LOG.warn("Failed to delete staging file: {}", file, thrown)) + .run(File::delete); + } + + private void completeUploads() { + if (multipartUploadId == null) { + long contentLength = stagingFiles.stream().mapToLong(File::length).sum(); + LOG.debug("Uploading {} staging files to oss, total byte size is: {}", stagingFiles.size(), contentLength); + + InputStream contentStream = new BufferedInputStream(stagingFiles.stream() + .map(OSSOutputStream::uncheckedInputStream) + .reduce(SequenceInputStream::new) + .orElseGet(() -> new ByteArrayInputStream(new byte[0]))); + + ObjectMetadata metadata = new ObjectMetadata(); + metadata.setContentLength(contentLength); + + PutObjectRequest request = new PutObjectRequest(uri.bucket(), uri.key(), contentStream, metadata); + client.putObject(request); + } else { + uploadParts(); + completeMultiPartUpload(); + } + } + + private void completeMultiPartUpload() { + Preconditions.checkState(closed, "Complete upload called on open stream: " + uri); + + List completedPartETags = + multiPartMap.values() + .stream() + .map(CompletableFuture::join) + .sorted(Comparator.comparing(UploadPartResult::getPartNumber)) + .map(UploadPartResult::getPartETag) + .collect(Collectors.toList()); + + CompleteMultipartUploadRequest request = new CompleteMultipartUploadRequest(uri.bucket(), uri.key(), + multipartUploadId, completedPartETags); + + Tasks.foreach(request) + .noRetry() + .onFailure((r, thrown) -> { + LOG.error("Failed to complete multipart upload request: {}", r, thrown); + abortUpload(); + }) + .throwFailureWhenFinished() + .run(client::completeMultipartUpload); + } + + private static InputStream uncheckedInputStream(File file) { + try { + return new FileInputStream(file); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @SuppressWarnings("checkstyle:NoFinalizer") + @Override + protected void finalize() throws Throwable { + super.finalize(); + if (!closed) { + close(); // releasing resources is more important than printing the warning. + String trace = Joiner.on("\n\t").join(Arrays.copyOfRange(createStack, 1, createStack.length)); + LOG.warn("Unclosed output stream created by:\n\t{}", trace); + } + } +} diff --git a/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSURI.java b/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSURI.java new file mode 100644 index 000000000000..a14033aebb11 --- /dev/null +++ b/aliyun/src/main/java/org/apache/iceberg/aliyun/oss/OSSURI.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aliyun.oss; + +import java.util.Set; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; + +public class OSSURI { + private static final String SCHEMA_DELIM = "://"; + private static final String PATH_DELIM = "/"; + private static final String QUERY_DELIM = "\\?"; + private static final String FRAGMENT_DELIM = "#"; + private static final Set VALID_SCHEMAS = ImmutableSet.of("https", "oss"); + + private final String location; + private final String bucket; + private final String key; + + /** + * Creates a new OSSURI based on the bucket and key parsed from the location as defined in: + * https://www.alibabacloud.com/help/doc-detail/31827.htm + *

+ * Supported access styles are Virtual Hosted addresses and oss://... URIs. + * + * @param location fully qualified URI. + */ + public OSSURI(String location) { + Preconditions.checkNotNull(location, "OSS location cannot be null."); + + this.location = location; + String[] schemaSplit = location.split(SCHEMA_DELIM, -1); + ValidationException.check(schemaSplit.length == 2, "Invalid OSS URI: %s", location); + + String schema = schemaSplit[0]; + ValidationException.check(VALID_SCHEMAS.contains(schema.toLowerCase()), "Invalid schema: %s", schema); + + String[] authoritySplit = schemaSplit[1].split(PATH_DELIM, 2); + ValidationException.check(authoritySplit.length == 2, "Invalid OSS URI: %s", location); + ValidationException.check(!authoritySplit[1].trim().isEmpty(), "Invalid OSS key: %s", location); + this.bucket = authoritySplit[0]; + + // Strip query and fragment if they exist + String path = authoritySplit[1]; + path = path.split(QUERY_DELIM, -1)[0]; + path = path.split(FRAGMENT_DELIM, -1)[0]; + this.key = path; + } + + /** + * Return OSS bucket name. + */ + public String bucket() { + return bucket; + } + + /** + * Return OSS object key name. + */ + public String key() { + return key; + } + + /** + * Returns original, unmodified OSS URI location. + */ + public String location() { + return location; + } + + @Override + public String toString() { + return location; + } +} diff --git a/aliyun/src/test/java/org/apache/iceberg/aliyun/TestAliyunClientFactory.java b/aliyun/src/test/java/org/apache/iceberg/aliyun/TestAliyunClientFactory.java new file mode 100644 index 000000000000..ac1aa39106cc --- /dev/null +++ b/aliyun/src/test/java/org/apache/iceberg/aliyun/TestAliyunClientFactory.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aliyun; + +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.junit.Test; + +public class TestAliyunClientFactory { + + @Test + public void testLoad() { + AssertHelpers.assertThrows("Invalid argument " + AliyunProperties.OSS_MULTIPART_UPLOAD_THREADS, + IllegalArgumentException.class, + () -> AliyunClientFactory.load( + ImmutableMap.of(AliyunProperties.OSS_MULTIPART_UPLOAD_THREADS, Integer.toString(-1))) + ); + + AssertHelpers.assertThrows("Invalid argument " + AliyunProperties.OSS_MULTIPART_SIZE, + IllegalArgumentException.class, + () -> AliyunClientFactory.load( + ImmutableMap.of(AliyunProperties.OSS_MULTIPART_SIZE, Integer.toString(99 * 1024))) + ); + + AssertHelpers.assertThrows("Invalid argument " + AliyunProperties.OSS_MULTIPART_SIZE, + IllegalArgumentException.class, + () -> AliyunClientFactory.load( + ImmutableMap.of(AliyunProperties.OSS_MULTIPART_SIZE, Long.toString(5 * 1024 * 1024 * 1024L + 1))) + ); + + AssertHelpers.assertThrows("Invalid argument " + AliyunProperties.OSS_MULTIPART_THRESHOLD_SIZE_DEFAULT, + IllegalArgumentException.class, + () -> AliyunClientFactory.load( + ImmutableMap.of(AliyunProperties.OSS_MULTIPART_THRESHOLD_SIZE, Integer.toString(0))) + ); + + AssertHelpers.assertThrows("Invalid argument " + AliyunProperties.OSS_MULTIPART_THRESHOLD_SIZE_DEFAULT, + IllegalArgumentException.class, + () -> AliyunClientFactory.load( + ImmutableMap.of( + AliyunProperties.OSS_MULTIPART_THRESHOLD_SIZE, Integer.toString(99 * 1024), + AliyunProperties.OSS_MULTIPART_SIZE, Integer.toString(100 * 1024) + ))); + } +} diff --git a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/OSSIntegrationTestRule.java b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/OSSIntegrationTestRule.java new file mode 100644 index 000000000000..91fb393f72c8 --- /dev/null +++ b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/OSSIntegrationTestRule.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aliyun.oss; + +import com.aliyun.oss.OSS; +import com.aliyun.oss.OSSClientBuilder; +import com.aliyun.oss.model.ListObjectsRequest; +import com.aliyun.oss.model.OSSObjectSummary; +import com.aliyun.oss.model.ObjectListing; +import java.util.UUID; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +/** + * It's used for integration test. Add those environment variables for integration testing. + *

+ * export OSS_TEST_RULE_CLASS_IMPL=org.apache.iceberg.aliyun.oss.OSSIntegrationTestRule
+ * export OSS_TEST_ENDPOINT=${your-oss-endpoint}
+ * export OSS_TEST_ACCESS_KEY=${your-oss-access-key}
+ * export OSS_TEST_ACCESS_SECRET=${your-oss-access-secret}
+ * export OSS_TEST_BUCKET_NAME=${your-oss-bucket-name}
+ * export OSS_TEST_KEY_PREFIX=${your-oss-object-key-prefix}
+ * 
+ */ +public class OSSIntegrationTestRule implements OSSTestRule { + + private static final String OSS_TEST_ENDPOINT = "OSS_TEST_ENDPOINT"; + private static final String OSS_TEST_ACCESS_KEY = "OSS_TEST_ACCESS_KEY"; + private static final String OSS_TEST_ACCESS_SECRET = "OSS_TEST_ACCESS_SECRET"; + private static final String OSS_TEST_BUCKET_NAME = "OSS_TEST_BUCKET_NAME"; + private static final String OSS_TEST_KEY_PREFIX = "OSS_TEST_KEY_PREFIX"; + + private String endpoint; + private String accessKey; + private String accessSecret; + private String testBucketName; + private String keyPrefix; + + private OSS lazyClient = null; + + @Override + public void start() { + endpoint = System.getenv(OSS_TEST_ENDPOINT); + Preconditions.checkNotNull(endpoint, "Does not set '%s' environment variable", OSS_TEST_ENDPOINT); + + accessKey = System.getenv(OSS_TEST_ACCESS_KEY); + Preconditions.checkNotNull(accessKey, "Does not set '%s' environment variable", OSS_TEST_ACCESS_KEY); + + accessSecret = System.getenv(OSS_TEST_ACCESS_SECRET); + Preconditions.checkNotNull(accessSecret, "Does not set '%s' environment variable", OSS_TEST_ACCESS_SECRET); + + testBucketName = System.getenv(OSS_TEST_BUCKET_NAME); + Preconditions.checkNotNull(testBucketName, "Does not set '%s' environment variable", OSS_TEST_BUCKET_NAME); + + keyPrefix = System.getenv(OSS_TEST_KEY_PREFIX); + if (keyPrefix == null) { + keyPrefix = String.format("iceberg-oss-testing-%s", UUID.randomUUID()); + } + } + + @Override + public void stop() { + + } + + private OSS client() { + if (lazyClient == null) { + synchronized (OSSIntegrationTestRule.class) { + if (lazyClient == null) { + lazyClient = createOSSClient(); + } + } + } + + return lazyClient; + } + + @Override + public String testBucketName() { + return testBucketName; + } + + @Override + public OSS createOSSClient() { + Preconditions.checkNotNull(endpoint, "OSS endpoint cannot be null"); + Preconditions.checkNotNull(accessKey, "OSS access key cannot be null"); + Preconditions.checkNotNull(accessSecret, "OSS access secret cannot be null"); + + return new OSSClientBuilder().build(endpoint, accessKey, accessSecret); + } + + @Override + public String keyPrefix() { + return keyPrefix; + } + + @Override + public void setUpBucket(String bucket) { + Preconditions.checkArgument(client().doesBucketExist(bucket), + "Bucket %s does not exist, please create it firstly.", bucket); + } + + @Override + public void tearDownBucket(String bucket) { + ObjectListing objectListing = client().listObjects( + new ListObjectsRequest(bucket) + .withPrefix(keyPrefix) + ); + + for (OSSObjectSummary s : objectListing.getObjectSummaries()) { + client().deleteObject(bucket, s.getKey()); + } + } +} diff --git a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/OSSTestBase.java b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/OSSTestBase.java new file mode 100644 index 000000000000..700df9ac9d65 --- /dev/null +++ b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/OSSTestBase.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aliyun.oss; + +import com.aliyun.oss.OSS; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.util.SerializableSupplier; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; + +public abstract class OSSTestBase { + + @ClassRule + public static final OSSTestRule OSS_TEST_RULE = OSSTestRule.initialize(); + + private final SerializableSupplier oss = OSS_TEST_RULE::createOSSClient; + private final String bucketName = OSS_TEST_RULE.testBucketName(); + private final String keyPrefix = OSS_TEST_RULE.keyPrefix(); + + private OSSFileIO fileIO; + + @Before + public void before() { + fileIO = new OSSFileIO(oss); + + OSS_TEST_RULE.setUpBucket(bucketName); + } + + @After + public void after() { + OSS_TEST_RULE.tearDownBucket(bucketName); + } + + protected String location(String key) { + return String.format("oss://%s/%s%s", bucketName, keyPrefix, key); + } + + protected SerializableSupplier oss() { + return oss; + } + + protected FileIO fileIO() { + return fileIO; + } +} diff --git a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/OSSTestRule.java b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/OSSTestRule.java new file mode 100644 index 000000000000..7f164aadc3b5 --- /dev/null +++ b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/OSSTestRule.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aliyun.oss; + +import com.aliyun.oss.OSS; +import java.util.UUID; +import org.apache.iceberg.aliyun.oss.mock.OSSMockRule; +import org.apache.iceberg.common.DynConstructors; +import org.junit.rules.TestRule; +import org.junit.runner.Description; +import org.junit.runners.model.Statement; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.util.StringUtils; + +public interface OSSTestRule extends TestRule { + Logger LOG = LoggerFactory.getLogger(OSSTestRule.class); + UUID RANDOM_UUID = java.util.UUID.randomUUID(); + + String OSS_TEST_RULE_CLASS_IMPL = "OSS_TEST_RULE_CLASS_IMPL"; + + /** + * Start the Aliyun Object storage services application that the OSS client could connect to. + */ + void start(); + + /** + * Stop the Aliyun object storage services. + */ + void stop(); + + /** + * Returns an newly created {@link OSS} client. + */ + OSS createOSSClient(); + + /** + * Returns a specific bucket name for testing purpose. + */ + default String testBucketName() { + return String.format("oss-testing-bucket-%s", RANDOM_UUID); + } + + /** + * Returns the common key prefix for those newly created objects in test cases. For example, we set the test bucket + * to be 'oss-testing-bucket' and the key prefix to be 'iceberg-objects/', then the produced objects in test cases + * will be: + *
+   *   oss://oss-testing-bucket/iceberg-objects/a.dat
+   *   oss://oss-testing-bucket/iceberg-objects/b.dat
+   *   ...
+   * 
+ */ + String keyPrefix(); + + /** + * Preparation work of bucket for the test case, for example we need to check the existence of specific bucket. + */ + void setUpBucket(String bucket); + + /** + * Clean all the objects that created from this test suite in the bucket. + */ + void tearDownBucket(String bucket); + + @Override + default Statement apply(Statement base, Description description) { + return new Statement() { + @Override + public void evaluate() throws Throwable { + start(); + try { + base.evaluate(); + } finally { + stop(); + } + } + }; + } + + static OSSTestRule initialize() { + String implClass = System.getenv(OSSIntegrationTestRule.OSS_TEST_RULE_CLASS_IMPL); + + LOG.info("The initializing OSSTestRule implementation is: {}", implClass); + + OSSTestRule testRule; + if (!StringUtils.isEmpty(implClass)) { + DynConstructors.Ctor ctor; + try { + ctor = DynConstructors.builder(OSSTestRule.class).impl(implClass).buildChecked(); + } catch (NoSuchMethodException e) { + throw new IllegalArgumentException(String.format( + "Cannot initialize OSSTestRule, missing no-arg constructor: %s", implClass), e); + } + + try { + testRule = ctor.newInstance(); + } catch (ClassCastException e) { + throw new IllegalArgumentException( + String.format("Cannot initialize OSSTestRule, %s does not implement OSSTestRule.", implClass), e); + } + + } else { + testRule = OSSMockRule.builder().silent().build(); + } + + return testRule; + } +} diff --git a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/TestOSSFileIO.java b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/TestOSSFileIO.java new file mode 100644 index 000000000000..5fd20ee792e3 --- /dev/null +++ b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/TestOSSFileIO.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aliyun.oss; + +import com.aliyun.oss.OSS; +import com.aliyun.oss.OSSClient; +import com.aliyun.oss.OSSClientBuilder; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Random; +import java.util.UUID; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.util.SerializableSupplier; +import org.apache.iceberg.util.SerializationUtil; +import org.junit.Assert; +import org.junit.Test; + +public class TestOSSFileIO extends OSSTestBase { + private static final String OSS_IMPL_CLASS = OSSFileIO.class.getName(); + + private final Random random = new Random(1); + private final Configuration conf = new Configuration(); + + @Test + public void newInputFile() throws IOException { + String location = location("key.txt"); + byte[] expected = new byte[1024 * 1024]; + random.nextBytes(expected); + + InputFile in = fileIO().newInputFile(location); + Assert.assertFalse(in.exists()); + + OutputFile out = fileIO().newOutputFile(location); + try (OutputStream os = out.createOrOverwrite()) { + IOUtils.write(expected, os); + } + + Assert.assertTrue(in.exists()); + + byte[] actual = new byte[1024 * 1024]; + try (InputStream is = in.newStream()) { + IOUtils.readFully(is, actual); + } + Assert.assertArrayEquals(expected, actual); + + fileIO().deleteFile(in); + Assert.assertFalse(fileIO().newInputFile(location).exists()); + } + + @Test + public void testLoadFileIO() { + FileIO fileIO = CatalogUtil.loadFileIO(OSS_IMPL_CLASS, ImmutableMap.of(), conf); + Assert.assertTrue("Should be OSSFileIO", fileIO instanceof OSSFileIO); + + byte[] data = SerializationUtil.serializeToBytes(fileIO); + FileIO expectedFileIO = SerializationUtil.deserializeFromBytes(data); + Assert.assertTrue("The deserialized FileIO should be OSSFileIO", expectedFileIO instanceof OSSFileIO); + } + + @Test + public void serializeClient() throws URISyntaxException { + String endpoint = "iceberg-test-oss.aliyun.com"; + String accessKeyId = UUID.randomUUID().toString(); + String accessSecret = UUID.randomUUID().toString(); + SerializableSupplier pre = () -> new OSSClientBuilder().build(endpoint, accessKeyId, accessSecret); + + byte[] data = SerializationUtil.serializeToBytes(pre); + SerializableSupplier post = SerializationUtil.deserializeFromBytes(data); + + OSS client = post.get(); + Assert.assertTrue(client instanceof OSSClient); + + OSSClient ossClient = (OSSClient) client; + Assert.assertEquals(new URI("http://" + endpoint), ossClient.getEndpoint()); + Assert.assertEquals(accessKeyId, ossClient.getCredentialsProvider().getCredentials().getAccessKeyId()); + Assert.assertEquals(accessSecret, ossClient.getCredentialsProvider().getCredentials().getSecretAccessKey()); + } +} diff --git a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/TestOSSInputStream.java b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/TestOSSInputStream.java new file mode 100644 index 000000000000..51e1a8c1f4b6 --- /dev/null +++ b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/TestOSSInputStream.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aliyun.oss; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.Arrays; +import java.util.Random; +import org.apache.commons.io.IOUtils; +import org.apache.iceberg.io.SeekableInputStream; +import org.junit.Test; + +import static org.apache.iceberg.AssertHelpers.assertThrows; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +public class TestOSSInputStream extends OSSTestBase { + + private final Random random = new Random(1); + + @Test + public void testRead() throws Exception { + OSSURI uri = new OSSURI(location("read.dat")); + int dataSize = 1024 * 1024 * 10; + byte[] data = randomData(dataSize); + + writeOSSData(uri, data); + + try (SeekableInputStream in = new OSSInputStream(oss().get(), uri)) { + int readSize = 1024; + + readAndCheck(in, in.getPos(), readSize, data, false); + readAndCheck(in, in.getPos(), readSize, data, true); + + // Seek forward in current stream + int seekSize = 1024; + readAndCheck(in, in.getPos() + seekSize, readSize, data, false); + readAndCheck(in, in.getPos() + seekSize, readSize, data, true); + + // Buffered read + readAndCheck(in, in.getPos(), readSize, data, true); + readAndCheck(in, in.getPos(), readSize, data, false); + + // Seek with new stream + long seekNewStreamPosition = 2 * 1024 * 1024; + readAndCheck(in, in.getPos() + seekNewStreamPosition, readSize, data, true); + readAndCheck(in, in.getPos() + seekNewStreamPosition, readSize, data, false); + + // Backseek and read + readAndCheck(in, 0, readSize, data, true); + readAndCheck(in, 0, readSize, data, false); + } + } + + private void readAndCheck(SeekableInputStream in, long rangeStart, int size, byte[] original, boolean buffered) + throws IOException { + in.seek(rangeStart); + assertEquals(rangeStart, in.getPos()); + + long rangeEnd = rangeStart + size; + byte[] actual = new byte[size]; + + if (buffered) { + IOUtils.readFully(in, actual); + } else { + int read = 0; + while (read < size) { + actual[read++] = (byte) in.read(); + } + } + + assertEquals(rangeEnd, in.getPos()); + assertArrayEquals(Arrays.copyOfRange(original, (int) rangeStart, (int) rangeEnd), actual); + } + + @Test + public void testClose() throws Exception { + OSSURI uri = new OSSURI(location("closed.dat")); + SeekableInputStream closed = new OSSInputStream(oss().get(), uri); + closed.close(); + assertThrows("Cannot seek the input stream after closed.", IllegalStateException.class, () -> { + closed.seek(0); + return null; + }); + } + + @Test + public void testSeek() throws Exception { + OSSURI uri = new OSSURI(location("seek.dat")); + byte[] expected = randomData(1024 * 1024); + + writeOSSData(uri, expected); + + try (SeekableInputStream in = new OSSInputStream(oss().get(), uri)) { + in.seek(expected.length / 2); + + byte[] actual = new byte[expected.length / 2]; + IOUtils.readFully(in, actual); + assertArrayEquals(Arrays.copyOfRange(expected, expected.length / 2, expected.length), actual); + } + } + + private byte[] randomData(int size) { + byte[] data = new byte[size]; + random.nextBytes(data); + return data; + } + + private void writeOSSData(OSSURI uri, byte[] data) { + oss().get().putObject(uri.bucket(), uri.key(), new ByteArrayInputStream(data)); + } +} diff --git a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/TestOSSOutputStream.java b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/TestOSSOutputStream.java new file mode 100644 index 000000000000..4acd15ac6562 --- /dev/null +++ b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/TestOSSOutputStream.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aliyun.oss; + +import com.aliyun.oss.OSS; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Random; +import java.util.UUID; +import org.apache.commons.io.IOUtils; +import org.apache.iceberg.aliyun.AliyunProperties; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.SeekableInputStream; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.mockito.AdditionalAnswers.delegatesTo; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class TestOSSOutputStream extends OSSTestBase { + private static final Logger LOG = LoggerFactory.getLogger(TestOSSOutputStream.class); + + private final OSS oss = oss().get(); + private final OSS ossMock = mock(OSS.class, delegatesTo(oss)); + + private final Random random = new Random(1); + private final Path tmpDir = Files.createTempDirectory("oss-file-io-test-"); + + private AliyunProperties props = new AliyunProperties(ImmutableMap.of( + AliyunProperties.OSS_MULTIPART_SIZE, Integer.toString(5 * 1024 * 1024), + AliyunProperties.OSS_MULTIPART_THRESHOLD_SIZE, Integer.toString(8 * 1024 * 1024), + AliyunProperties.OSS_STAGING_DIRECTORY, tmpDir.toString() + )); + + public TestOSSOutputStream() throws IOException { + } + + @Test + public void testWrite() throws IOException { + OSSURI uri = randomURI(); + + for (int i = 0; i < 2; i++) { + boolean arrayWrite = i % 2 == 0; + // Write small file that is less than multi part size. + writeAndVerify(ossMock, uri, data256(), arrayWrite); + verify(ossMock, times(1)).putObject(any()); + reset(ossMock); + + // Write file larger than part size but less than multipart threshold. + writeAndVerify(ossMock, uri, randomData(6 * 1024 * 1024), arrayWrite); + verify(ossMock, times(1)).putObject(any()); + reset(ossMock); + + // Write file large enough to trigger multipart upload. + writeAndVerify(ossMock, uri, randomData(10 * 1024 * 1024), arrayWrite); + verify(ossMock, times(1)).initiateMultipartUpload(any()); + verify(ossMock, times(2)).uploadPart(any()); + verify(ossMock, times(1)).completeMultipartUpload(any()); + reset(ossMock); + + // Test uploading many parts + writeAndVerify(ossMock, uri, randomData(22 * 1024 * 1024), arrayWrite); + verify(ossMock, times(1)).initiateMultipartUpload(any()); + verify(ossMock, times(5)).uploadPart(any()); + verify(ossMock, times(1)).completeMultipartUpload(any()); + reset(ossMock); + } + } + + @Test + public void testAbortAfterFailedPartUpload() { + doThrow(new RuntimeException()).when(ossMock).uploadPart(any()); + + boolean caughtError = false; + try (OSSOutputStream stream = new OSSOutputStream(ossMock, randomURI(), props)) { + stream.write(randomData(10 * 1024 * 1024)); + } catch (Exception e) { + caughtError = true; + verify(ossMock, times(1)).initiateMultipartUpload(any()); + verify(ossMock, atLeastOnce()).abortMultipartUpload(any()); + } + Assert.assertTrue("Should have encountered the upload part failure.", caughtError); + } + + @Test + public void testAbortMultipart() { + doThrow(new RuntimeException()).when(ossMock).completeMultipartUpload(any()); + + boolean caughtError = false; + try (OSSOutputStream stream = new OSSOutputStream(ossMock, randomURI(), props)) { + stream.write(randomData(10 * 1024 * 1024)); + } catch (Exception e) { + caughtError = true; + verify(ossMock).abortMultipartUpload(any()); + } + Assert.assertTrue("Should have encountered the failure", caughtError); + } + + @Test + public void testMultipleClose() throws IOException { + OSSOutputStream stream = new OSSOutputStream(ossMock, randomURI(), props); + stream.close(); + stream.close(); + } + + private void writeAndVerify(OSS mock, OSSURI uri, byte[] data, boolean arrayWrite) + throws IOException { + LOG.info("Write and verify for arguments uri: {}, data length: {}, arrayWrite: {}", uri, data.length, + arrayWrite); + + try (OSSOutputStream out = new OSSOutputStream(mock, uri, props)) { + if (arrayWrite) { + out.write(data); + Assert.assertEquals(data.length, out.getPos()); + } else { + for (int i = 0; i < data.length; i++) { + out.write(data[i]); + Assert.assertEquals(i + 1, out.getPos()); + } + } + } + + InputFile inputFile = fileIO().newInputFile(uri.location()); + Assert.assertTrue(inputFile.exists()); + Assert.assertEquals(data.length, inputFile.getLength()); + + byte[] actual = new byte[data.length]; + try (SeekableInputStream in = inputFile.newStream()) { + IOUtils.readFully(in, actual); + } + Assert.assertArrayEquals(data, actual); + + // Verify all staging files are cleaned up. + Assert.assertEquals(0, Files.list(Paths.get(props.ossStagingDirectory())).count()); + } + + private OSSURI randomURI() { + return new OSSURI(location(String.format("%s.dat", UUID.randomUUID()))); + } + + private byte[] data256() { + byte[] data = new byte[256]; + for (int i = 0; i < 256; i++) { + data[i] = (byte) i; + } + return data; + } + + private byte[] randomData(int size) { + byte[] data = new byte[size]; + random.nextBytes(data); + return data; + } +} diff --git a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/TestOSSURI.java b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/TestOSSURI.java new file mode 100644 index 000000000000..19771ab4d786 --- /dev/null +++ b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/TestOSSURI.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aliyun.oss; + +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.junit.Assert; +import org.junit.Test; + +public class TestOSSURI { + + @Test + public void testLocationParsing() { + String p1 = "oss://bucket/path/to/file"; + OSSURI uri1 = new OSSURI(p1); + + Assert.assertEquals("bucket", uri1.bucket()); + Assert.assertEquals("path/to/file", uri1.key()); + Assert.assertEquals(p1, uri1.toString()); + } + + @Test + public void testEncodedString() { + String p1 = "oss://bucket/path%20to%20file"; + OSSURI uri1 = new OSSURI(p1); + + Assert.assertEquals("bucket", uri1.bucket()); + Assert.assertEquals("path%20to%20file", uri1.key()); + Assert.assertEquals(p1, uri1.toString()); + } + + @Test + public void missingKey() { + AssertHelpers.assertThrows("Missing key", ValidationException.class, () -> new OSSURI("https://bucket/")); + } + + @Test + public void relativePathing() { + AssertHelpers.assertThrows("Cannot use relative oss path.", ValidationException.class, + () -> new OSSURI("/path/to/file")); + } + + @Test + public void invalidScheme() { + AssertHelpers.assertThrows("Invalid schema", ValidationException.class, () -> new OSSURI("invalid://bucket/")); + } + + @Test + public void testQueryAndFragment() { + String p1 = "oss://bucket/path/to/file?query=foo#bar"; + OSSURI uri1 = new OSSURI(p1); + + Assert.assertEquals("bucket", uri1.bucket()); + Assert.assertEquals("path/to/file", uri1.key()); + Assert.assertEquals(p1, uri1.toString()); + } + + @Test + public void testValidSchemes() { + for (String scheme : Lists.newArrayList("https", "oss")) { + OSSURI uri = new OSSURI(scheme + "://bucket/path/to/file"); + Assert.assertEquals("bucket", uri.bucket()); + Assert.assertEquals("path/to/file", uri.key()); + } + } +} diff --git a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/LocalOSSController.java b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/LocalOSSController.java new file mode 100644 index 000000000000..5d8f30ad0944 --- /dev/null +++ b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/LocalOSSController.java @@ -0,0 +1,441 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aliyun.oss.mock; + +import com.aliyun.oss.OSSErrorCode; +import com.aliyun.oss.model.Bucket; +import com.fasterxml.jackson.annotation.JsonFormat; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonRootName; +import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlElementWrapper; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.Date; +import java.util.List; +import java.util.UUID; +import javax.servlet.ServletInputStream; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import org.apache.commons.io.IOUtils; +import org.apache.commons.io.input.BoundedInputStream; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.ControllerAdvice; +import org.springframework.web.bind.annotation.ExceptionHandler; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestHeader; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.servlet.mvc.method.annotation.ResponseEntityExceptionHandler; + +import static org.springframework.http.HttpStatus.INTERNAL_SERVER_ERROR; +import static org.springframework.http.HttpStatus.OK; +import static org.springframework.http.HttpStatus.PARTIAL_CONTENT; +import static org.springframework.http.HttpStatus.REQUESTED_RANGE_NOT_SATISFIABLE; + +@RestController +public class LocalOSSController { + private static final Logger LOG = LoggerFactory.getLogger(LocalOSSController.class); + + @Autowired + private LocalStore localStore; + + @RequestMapping(value = "/{bucketName}", method = RequestMethod.PUT, produces = "application/xml") + public void putBucket(@PathVariable String bucketName) throws IOException { + if (localStore.getBucket(bucketName) != null) { + throw new OssException(409, OSSErrorCode.BUCKET_ALREADY_EXISTS, bucketName + " already exists."); + } + + localStore.createBucket(bucketName); + } + + @RequestMapping(value = "/{bucketName}", method = RequestMethod.DELETE, produces = "application/xml") + public void deleteBucket(@PathVariable String bucketName) throws IOException { + verifyBucketExistence(bucketName); + + localStore.deleteBucket(bucketName); + } + + @RequestMapping(value = "/{bucketName:.+}/**", method = RequestMethod.PUT) + public ResponseEntity putObject(@PathVariable String bucketName, HttpServletRequest request) { + verifyBucketExistence(bucketName); + String filename = filenameFrom(bucketName, request); + try (ServletInputStream inputStream = request.getInputStream()) { + ObjectMetadata metadata = localStore.putObject(bucketName, + filename, + inputStream, + request.getContentType(), + request.getHeader(HttpHeaders.CONTENT_ENCODING), + ImmutableMap.of()); + + HttpHeaders responseHeaders = new HttpHeaders(); + responseHeaders.setETag("\"" + metadata.getContentMD5() + "\""); + responseHeaders.setLastModified(metadata.getLastModificationDate()); + + return new ResponseEntity<>(responseHeaders, OK); + } catch (Exception e) { + LOG.error("Failed to put object - bucket: {} - object: {}", bucketName, filename, e); + return new ResponseEntity<>(e.getMessage(), INTERNAL_SERVER_ERROR); + } + } + + @RequestMapping(value = "/{bucketName:.+}/**", method = RequestMethod.DELETE) + public void deleteObject(@PathVariable String bucketName, HttpServletRequest request) { + verifyBucketExistence(bucketName); + + localStore.deleteObject(bucketName, filenameFrom(bucketName, request)); + } + + @RequestMapping(value = "/{bucketName:.+}/**", method = RequestMethod.HEAD) + public ResponseEntity getObjectMeta(@PathVariable String bucketName, HttpServletRequest request) { + verifyBucketExistence(bucketName); + ObjectMetadata metadata = verifyObjectExistence(bucketName, filenameFrom(bucketName, request)); + + HttpHeaders headers = new HttpHeaders(); + headers.setETag("\"" + metadata.getContentMD5() + "\""); + headers.setLastModified(metadata.getLastModificationDate()); + headers.setContentLength(metadata.getContentLength()); + + return new ResponseEntity<>(headers, OK); + } + + @SuppressWarnings("checkstyle:AnnotationUseStyle") + @RequestMapping( + value = "/{bucketName:.+}/**", + method = RequestMethod.GET, + produces = "application/xml") + public void getObject(@PathVariable String bucketName, + @RequestHeader(value = "Range", required = false) Range range, + HttpServletRequest request, + HttpServletResponse response) throws IOException { + verifyBucketExistence(bucketName); + + String filename = filenameFrom(bucketName, request); + ObjectMetadata metadata = verifyObjectExistence(bucketName, filename); + + if (range != null) { + long fileSize = metadata.getContentLength(); + long bytesToRead = Math.min(fileSize - 1, range.end()) - range.start() + 1; + + if (bytesToRead < 0 || fileSize < range.start()) { + response.setStatus(REQUESTED_RANGE_NOT_SATISFIABLE.value()); + response.flushBuffer(); + return; + } + + response.setStatus(PARTIAL_CONTENT.value()); + response.setHeader(HttpHeaders.ACCEPT_RANGES, "bytes"); + response.setHeader(HttpHeaders.CONTENT_RANGE, String.format("bytes %s-%s/%s", + range.start(), bytesToRead + range.start() + 1, metadata.getContentLength())); + response.setHeader(HttpHeaders.ETAG, "\"" + metadata.getContentMD5() + "\""); + response.setDateHeader(HttpHeaders.LAST_MODIFIED, metadata.getLastModificationDate()); + response.setContentType(metadata.getContentType()); + response.setContentLengthLong(bytesToRead); + + try (OutputStream outputStream = response.getOutputStream()) { + try (FileInputStream fis = new FileInputStream(metadata.getDataFile())) { + fis.skip(range.start()); + IOUtils.copy(new BoundedInputStream(fis, bytesToRead), outputStream); + } + } + + } else { + response.setHeader(HttpHeaders.ACCEPT_RANGES, "bytes"); + response.setHeader(HttpHeaders.ETAG, "\"" + metadata.getContentMD5() + "\""); + response.setDateHeader(HttpHeaders.LAST_MODIFIED, metadata.getLastModificationDate()); + response.setContentType(metadata.getContentType()); + response.setContentLengthLong(metadata.getContentLength()); + + try (OutputStream outputStream = response.getOutputStream()) { + try (FileInputStream fis = new FileInputStream(metadata.getDataFile())) { + IOUtils.copy(fis, outputStream); + } + } + } + } + + @RequestMapping( + value = "/{bucketName:.+}/**", + params = "uploads", + method = RequestMethod.POST, + produces = "application/xml") + public InitiateMultipartUploadResult initializeMultiPartUpload(@PathVariable String bucketName, + HttpServletRequest request) { + verifyBucketExistence(bucketName); + + String filename = filenameFrom(bucketName, request); + String uploadId = UUID.randomUUID().toString(); + + localStore.prepareMultipartUpload(bucketName, filename, uploadId); + return new InitiateMultipartUploadResult(bucketName, filename, uploadId); + } + + @RequestMapping( + value = "/{bucketName:.+}/**", + params = {"uploadId", "partNumber"}, + method = RequestMethod.PUT) + public ResponseEntity uploadPart(@PathVariable String bucketName, + @RequestParam String uploadId, + @RequestParam String partNumber, + HttpServletRequest request) + throws IOException { + verifyBucketExistence(bucketName); + verifyPartNumberLimits(partNumber); + + String etag = localStore.putPart(bucketName, uploadId, partNumber, request.getInputStream()); + + HttpHeaders responseHeaders = new HttpHeaders(); + responseHeaders.setETag(String.format("\"%s\"", etag)); + + return new ResponseEntity<>(responseHeaders, OK); + } + + @SuppressWarnings("checkstyle:AnnotationUseStyle") + @RequestMapping( + value = "/{bucketName:.+}/**", + params = {"uploadId"}, + method = RequestMethod.POST, + produces = "application/xml") + public ResponseEntity completeMultiPartUpload(@PathVariable String bucketName, + @RequestParam String uploadId, + @RequestBody + CompleteMultipartUploadRequest + requestBody, + HttpServletRequest request) + throws IOException { + verifyBucketExistence(bucketName); + + String filename = filenameFrom(bucketName, request); + verifyMultiParts(requestBody.parts); + + String eTag = localStore.completeMultipartUpload(bucketName, filename, uploadId, requestBody.parts); + return new ResponseEntity<>( + new CompleteMultipartUploadResult(request.getRequestURL().toString(), bucketName, filename, eTag), + new HttpHeaders(), OK + ); + } + + @SuppressWarnings("checkstyle:AnnotationUseStyle") + @RequestMapping( + value = "/{bucketName:.+}/**", + params = {"uploadId"}, + method = RequestMethod.DELETE, + produces = "application/xml") + public void abortMultipartUploads(@PathVariable String bucketName, + @RequestParam String uploadId, + HttpServletRequest request) { + verifyBucketExistence(bucketName); + + String filename = filenameFrom(bucketName, request); + localStore.abortMultipartUpload(bucketName, filename, uploadId); + } + + private void verifyBucketExistence(String bucketName) { + Bucket bucket = localStore.getBucket(bucketName); + if (bucket == null) { + throw new OssException(404, OSSErrorCode.NO_SUCH_BUCKET, "The specified bucket does not exist. "); + } + } + + private ObjectMetadata verifyObjectExistence(String bucketName, String filename) { + ObjectMetadata objectMetadata = null; + try { + objectMetadata = localStore.getObjectMetadata(bucketName, filename); + } catch (IOException e) { + LOG.error("Failed to get the object metadata, bucket: {}, object: {}.", bucketName, filename, e); + } + + if (objectMetadata == null) { + throw new OssException(404, OSSErrorCode.NO_SUCH_KEY, "The specify oss key does not exists."); + } + + return objectMetadata; + } + + private static String filenameFrom(@PathVariable String bucketName, HttpServletRequest request) { + String requestUri = request.getRequestURI(); + return requestUri.substring(requestUri.indexOf(bucketName) + bucketName.length() + 1); + } + + private void verifyPartNumberLimits(String partNumberString) { + boolean isValid; + try { + int partNumber = Integer.parseInt(partNumberString); + isValid = partNumber >= 1 && partNumber <= 10000; + } catch (NumberFormatException e) { + isValid = false; + } + + if (!isValid) { + throw new OssException(HttpStatus.BAD_REQUEST.value(), "InvalidArgument", + "Part number must be an integer between 1 and 10000, inclusive"); + } + } + + private void verifyMultiParts(List parts) { + int prevPartNumber = 0; + for (Part part : parts) { + if (part.partNumber < prevPartNumber) { + throw new OssException(HttpStatus.BAD_REQUEST.value(), "InvalidPartOrder", + "The list of parts was not in ascending order. The parts list must be specified in " + + "order by part number."); + } + } + } + + @ControllerAdvice + public static class OSSMockExceptionHandler extends ResponseEntityExceptionHandler { + + @ExceptionHandler + public ResponseEntity handleOSSException(OssException ex) { + LOG.info("Responding with status {} - {}, {}", ex.status, ex.code, ex.message); + + ErrorResponse errorResponse = new ErrorResponse(); + errorResponse.setCode(ex.getCode()); + errorResponse.setMessage(ex.getMessage()); + + HttpHeaders headers = new HttpHeaders(); + headers.setContentType(MediaType.APPLICATION_XML); + + return ResponseEntity.status(ex.status) + .headers(headers) + .body(errorResponse); + } + } + + public static class OssException extends RuntimeException { + + private final int status; + private final String code; + private final String message; + + public OssException(final int status, final String code, final String message) { + super(message); + this.status = status; + this.code = code; + this.message = message; + } + + public String getCode() { + return code; + } + + @Override + public String getMessage() { + return message; + } + } + + @JsonRootName("Error") + public static class ErrorResponse { + @JsonProperty("Code") + private String code; + + @JsonProperty("Message") + private String message; + + public void setCode(String code) { + this.code = code; + } + + public void setMessage(String message) { + this.message = message; + } + } + + @JsonRootName("InitiateMultipartUploadResult") + public static class InitiateMultipartUploadResult { + + @JsonProperty("Bucket") + private final String bucketName; + + @JsonProperty("Key") + private final String fileName; + + @JsonProperty("UploadId") + private final String uploadId; + + public InitiateMultipartUploadResult(String bucketName, + String fileName, + String uploadId) { + this.bucketName = bucketName; + this.fileName = fileName; + this.uploadId = uploadId; + } + } + + @JsonRootName("Part") + public static class Part { + + @JsonProperty("PartNumber") + Integer partNumber; + + @JsonProperty("LastModified") + @JsonFormat(pattern = "yyyy-MM-dd'T'HH:mm:ss.SSSZ") + Date lastModified; + + @JsonProperty("ETag") + String etag; + + @JsonProperty("Size") + Long size; + } + + @JsonRootName("CompleteMultipartUpload") + public static class CompleteMultipartUploadRequest { + + @JsonProperty("Part") + @JacksonXmlElementWrapper(useWrapping = false) + private List parts; + } + + @JsonRootName("CompleteMultipartUploadResult") + public class CompleteMultipartUploadResult { + + @JsonProperty("Location") + private final String location; + + @JsonProperty("Bucket") + private final String bucket; + + @JsonProperty("Key") + private final String key; + + @JsonProperty("ETag") + private final String etag; + + public CompleteMultipartUploadResult(String location, String bucket, String key, String etag) { + this.location = location; + this.bucket = bucket; + this.key = key; + this.etag = etag; + } + } +} diff --git a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/LocalStore.java b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/LocalStore.java new file mode 100644 index 000000000000..7d67d91af475 --- /dev/null +++ b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/LocalStore.java @@ -0,0 +1,335 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aliyun.oss.mock; + +import com.aliyun.oss.OSSErrorCode; +import com.aliyun.oss.model.Bucket; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.attribute.BasicFileAttributes; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.List; +import java.util.Map; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.directory.api.util.Hex; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.http.MediaType; +import org.springframework.stereotype.Component; + +@Component +public class LocalStore { + private static final Logger LOG = LoggerFactory.getLogger(LocalStore.class); + + private static final String DATA_FILE = ".DATA"; + private static final String META_FILE = ".META"; + private static final String PART_SUFFIX = ".PART"; + + private final File root; + + private final ObjectMapper objectMapper = new ObjectMapper(); + private final Map uploadIdToInfo = Maps.newConcurrentMap(); + + public LocalStore(@Value("${" + OSSMockApplication.PROP_ROOT_DIR + ":}") String rootDir) { + Preconditions.checkNotNull(rootDir, "Root directory cannot be null"); + this.root = new File(rootDir); + + root.deleteOnExit(); + root.mkdirs(); + + LOG.info("Root directory of local OSS store is {}", root); + } + + void createBucket(String bucketName) throws IOException { + File newBucket = new File(root, bucketName); + FileUtils.forceMkdir(newBucket); + } + + Bucket getBucket(String bucketName) { + List buckets = findBucketsByFilter(file -> + Files.isDirectory(file) && file.getFileName().endsWith(bucketName)); + + return buckets.size() > 0 ? buckets.get(0) : null; + } + + void deleteBucket(String bucketName) throws IOException { + Bucket bucket = getBucket(bucketName); + Preconditions.checkNotNull(bucket, "Bucket %s shouldn't be null.", bucketName); + + File dir = new File(root, bucket.getName()); + if (Files.walk(dir.toPath()).anyMatch(p -> p.toFile().isFile())) { + throw new LocalOSSController.OssException(409, OSSErrorCode.BUCKET_NOT_EMPTY, + "The bucket you tried to delete is not empty. "); + } + + FileUtils.deleteDirectory(dir); + } + + ObjectMetadata putObject(String bucketName, + String fileName, + InputStream dataStream, + String contentType, + String contentEncoding, + Map userMetaData) throws IOException { + File bucketDir = new File(root, bucketName); + assert bucketDir.exists() || bucketDir.mkdirs(); + + File dataFile = new File(bucketDir, fileName + DATA_FILE); + File metaFile = new File(bucketDir, fileName + META_FILE); + if (!dataFile.exists()) { + dataFile.getParentFile().mkdirs(); + dataFile.createNewFile(); + } + + inputStreamToFile(dataStream, dataFile); + + ObjectMetadata metadata = new ObjectMetadata(); + metadata.setContentLength(dataFile.length()); + metadata.setContentMD5(md5sum(dataFile.getAbsolutePath())); + metadata.setContentType(contentType != null ? contentType : MediaType.APPLICATION_OCTET_STREAM_VALUE); + metadata.setContentEncoding(contentEncoding); + metadata.setDataFile(dataFile.getAbsolutePath()); + metadata.setMetaFile(metaFile.getAbsolutePath()); + + BasicFileAttributes attributes = Files.readAttributes(dataFile.toPath(), BasicFileAttributes.class); + metadata.setLastModificationDate(attributes.lastModifiedTime().toMillis()); + + metadata.setUserMetaData(userMetaData); + + objectMapper.writeValue(metaFile, metadata); + + return metadata; + } + + void deleteObject(String bucketName, String filename) { + File bucketDir = new File(root, bucketName); + assert bucketDir.exists(); + + File dataFile = new File(bucketDir, filename + DATA_FILE); + File metaFile = new File(bucketDir, filename + META_FILE); + assert !dataFile.exists() || dataFile.delete(); + assert !metaFile.exists() || metaFile.delete(); + } + + ObjectMetadata getObjectMetadata(String bucketName, String filename) throws IOException { + File bucketDir = new File(root, bucketName); + assert bucketDir.exists(); + + File dataFile = new File(bucketDir, filename + DATA_FILE); + if (!dataFile.exists()) { + return null; + } + + File metaFile = new File(bucketDir, filename + META_FILE); + return objectMapper.readValue(metaFile, ObjectMetadata.class); + } + + void prepareMultipartUpload(String bucketName, String fileName, String uploadId) { + File bucketDir = new File(root, bucketName); + assert bucketDir.exists(); + + if (!Paths.get(root.getAbsolutePath(), bucketName, uploadId).toFile().mkdirs()) { + throw new IllegalStateException("Directories for storing multipart uploads couldn't be created."); + } + + uploadIdToInfo.put(uploadId, new MultipartUpload(bucketName, fileName, uploadId)); + } + + String putPart(String bucketName, + String uploadId, + String partNumber, + InputStream inputStream) + throws IOException { + File bucketDir = new File(root, bucketName); + assert bucketDir.exists(); + + // Must have created the .../bucket/uploadId/ directory. + if (!Paths.get(root.getAbsolutePath(), bucketName, uploadId).toFile().exists()) { + throw new IllegalStateException("Initialize the multi-upload firstly."); + } + + File partFile = Paths.get(root.getAbsolutePath(), bucketName, uploadId, partNumber + PART_SUFFIX).toFile(); + + inputStreamToFile(inputStream, partFile); + return md5sum(partFile.getAbsolutePath()); + } + + String completeMultipartUpload(String bucketName, + String filename, + String uploadId, + List parts) + throws IOException { + File bucketDir = new File(root, bucketName); + assert bucketDir.exists(); + + final MultipartUpload upload = uploadIdToInfo.get(uploadId); + assert upload != null; + Assert.assertEquals(bucketName, upload.bucket); + Assert.assertEquals(filename, upload.object); + Assert.assertEquals(uploadId, upload.uploadId); + + synchronized (upload) { + if (!uploadIdToInfo.containsKey(uploadId)) { + throw new IllegalStateException("Upload " + uploadId + " was aborted or completed concurrently"); + } + + File dataFile = new File(bucketDir, filename + DATA_FILE); + File metaFile = new File(bucketDir, filename + META_FILE); + if (!dataFile.exists()) { + dataFile.getParentFile().mkdirs(); + dataFile.createNewFile(); + } + + // Write those parts data into the data file. + try (OutputStream out = new FileOutputStream(dataFile)) { + for (LocalOSSController.Part part : parts) { + // Construct the part file name. + File partFile = Paths.get(root.getAbsolutePath(), bucketName, uploadId, + part.partNumber + PART_SUFFIX).toFile(); + + // Append the content of given part into the data file. + try (InputStream in = new FileInputStream(partFile)) { + IOUtils.copy(in, out); + } + } + } + + // Write the meta file. + ObjectMetadata metadata = new ObjectMetadata(); + metadata.setContentLength(dataFile.length()); + metadata.setContentMD5(md5sum(dataFile.getAbsolutePath())); + metadata.setContentType(MediaType.APPLICATION_OCTET_STREAM_VALUE); + metadata.setDataFile(dataFile.getAbsolutePath()); + metadata.setMetaFile(metaFile.getAbsolutePath()); + + BasicFileAttributes attributes = Files.readAttributes(dataFile.toPath(), BasicFileAttributes.class); + metadata.setLastModificationDate(attributes.lastModifiedTime().toMillis()); + metadata.setUserMetaData(ImmutableMap.of()); + + objectMapper.writeValue(metaFile, metadata); + + // Remove the MultiUpload out of in-memory map. + uploadIdToInfo.remove(uploadId); + + return metadata.getContentMD5(); + } + } + + void abortMultipartUpload(String bucketName, String filename, String uploadId) { + File bucketDir = new File(root, bucketName); + assert bucketDir.exists(); + + MultipartUpload upload = uploadIdToInfo.get(uploadId); + assert upload != null; + + synchronized (upload) { + if (!uploadIdToInfo.containsKey(uploadId)) { + throw new IllegalStateException("Upload " + uploadId + " was aborted or completed concurrently"); + } + + try { + File partDir = Paths.get(root.getAbsolutePath(), bucketName, uploadId).toFile(); + FileUtils.deleteDirectory(partDir); + + File dataFile = new File(bucketDir, filename + DATA_FILE); + FileUtils.deleteQuietly(dataFile); + + File metaFile = new File(bucketDir, filename + META_FILE); + FileUtils.deleteQuietly(metaFile); + + uploadIdToInfo.remove(uploadId); + } catch (IOException e) { + throw new IllegalStateException("Could not delete multipart upload tmp data.", e); + } + } + } + + static String md5sum(String filepath) throws IOException { + try (InputStream is = new FileInputStream(filepath)) { + return md5sum(is); + } + } + + static String md5sum(InputStream is) throws IOException { + MessageDigest md; + try { + md = MessageDigest.getInstance("MD5"); + md.reset(); + } catch (NoSuchAlgorithmException e) { + throw new RuntimeException(e); + } + byte[] bytes = new byte[1024]; + int numBytes; + + while ((numBytes = is.read(bytes)) != -1) { + md.update(bytes, 0, numBytes); + } + return new String(Hex.encodeHex(md.digest())); + } + + private static void inputStreamToFile(InputStream inputStream, File targetFile) throws IOException { + try (OutputStream outputStream = new FileOutputStream(targetFile)) { + IOUtils.copy(inputStream, outputStream); + } + } + + private List findBucketsByFilter(final DirectoryStream.Filter filter) { + List buckets = Lists.newArrayList(); + + try (DirectoryStream stream = Files.newDirectoryStream(root.toPath(), filter)) { + for (final Path path : stream) { + buckets.add(new Bucket(path.getFileName().toString())); + } + } catch (final IOException e) { + LOG.error("Could not Iterate over Bucket-Folders", e); + } + + return buckets; + } + + private static class MultipartUpload { + private final String bucket; + private final String object; + private final String uploadId; + + private MultipartUpload(String bucket, String object, String uploadId) { + this.bucket = bucket; + this.object = object; + this.uploadId = uploadId; + } + } +} diff --git a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/OSSMockApplication.java b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/OSSMockApplication.java new file mode 100644 index 000000000000..43e055d1a3aa --- /dev/null +++ b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/OSSMockApplication.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aliyun.oss.mock; + +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.Banner; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration; +import org.springframework.boot.builder.SpringApplicationBuilder; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.convert.converter.Converter; +import org.springframework.http.MediaType; +import org.springframework.http.converter.xml.MappingJackson2XmlHttpMessageConverter; +import org.springframework.util.StringUtils; +import org.springframework.web.servlet.config.annotation.ContentNegotiationConfigurer; +import org.springframework.web.servlet.config.annotation.WebMvcConfigurer; + +@SuppressWarnings("checkstyle:AnnotationUseStyle") +@Configuration +@EnableAutoConfiguration(exclude = {SecurityAutoConfiguration.class}, excludeName = { + "org.springframework.boot.actuate.autoconfigure.security.servlet.ManagementWebSecurityAutoConfiguration" +}) +@ComponentScan +public class OSSMockApplication { + + static final String PROP_ROOT_DIR = "root-dir"; + + static final String PROP_HTTP_PORT = "server.port"; + static final int PORT_HTTP_PORT_DEFAULT = 9393; + + static final String PROP_SILENT = "silent"; + + @Autowired + private ConfigurableApplicationContext context; + + @Autowired + private Config config; + + public static void main(final String[] args) { + start(Maps.newHashMap(), args); + } + + public static OSSMockApplication start(Map properties, String... args) { + Map defaults = Maps.newHashMap(); + defaults.put(PROP_HTTP_PORT, PORT_HTTP_PORT_DEFAULT); + + Banner.Mode bannerMode = Banner.Mode.CONSOLE; + + if (Boolean.parseBoolean(String.valueOf(properties.remove("silent")))) { + defaults.put("logging.level.root", "WARN"); + bannerMode = Banner.Mode.OFF; + } + + final ConfigurableApplicationContext ctx = + new SpringApplicationBuilder(OSSMockApplication.class) + .properties(defaults) + .properties(properties) + .bannerMode(bannerMode) + .run(args); + + return ctx.getBean(OSSMockApplication.class); + } + + public void stop() { + SpringApplication.exit(context, () -> 0); + } + + @Configuration + static class Config implements WebMvcConfigurer { + + @Value("${" + PROP_HTTP_PORT + "}") + private int httpPort; + + @Override + public void configureContentNegotiation(final ContentNegotiationConfigurer configurer) { + configurer.defaultContentType(MediaType.APPLICATION_FORM_URLENCODED, MediaType.APPLICATION_XML); + configurer.favorPathExtension(false); + configurer.mediaType("xml", MediaType.TEXT_XML); + } + + @Bean + Converter rangeConverter() { + return new RangeConverter(); + } + + /** + * Creates an HttpMessageConverter for XML. + * + * @return The configured {@link MappingJackson2XmlHttpMessageConverter}. + */ + @Bean + public MappingJackson2XmlHttpMessageConverter getMessageConverter() { + List mediaTypes = Lists.newArrayList(); + mediaTypes.add(MediaType.APPLICATION_XML); + mediaTypes.add(MediaType.APPLICATION_FORM_URLENCODED); + mediaTypes.add(MediaType.APPLICATION_OCTET_STREAM); + + final MappingJackson2XmlHttpMessageConverter xmlConverter = new MappingJackson2XmlHttpMessageConverter(); + xmlConverter.setSupportedMediaTypes(mediaTypes); + + return xmlConverter; + } + } + + private static class RangeConverter implements Converter { + + private static final Pattern REQUESTED_RANGE_PATTERN = Pattern.compile("^bytes=((\\d*)\\-(\\d*))((,\\d*-\\d*)*)"); + + @Override + public Range convert(String rangeString) { + Preconditions.checkNotNull(rangeString, "Range value should not be null."); + + final Range range; + + // parsing a range specification of format: "bytes=start-end" - multiple ranges not supported + final Matcher matcher = REQUESTED_RANGE_PATTERN.matcher(rangeString.trim()); + if (matcher.matches()) { + final String rangeStart = matcher.group(2); + final String rangeEnd = matcher.group(3); + + long start = StringUtils.isEmpty(rangeStart) ? 0L : Long.parseLong(rangeStart); + long end = StringUtils.isEmpty(rangeEnd) ? Long.MAX_VALUE : Long.parseLong(rangeEnd); + range = new Range(start, end); + + if (matcher.groupCount() == 5 && !"".equals(matcher.group(4))) { + throw new IllegalArgumentException( + "Unsupported range specification. Only single range specifications allowed"); + } + if (range.start() < 0) { + throw new IllegalArgumentException( + "Unsupported range specification. A start byte must be supplied"); + } + + if (range.end() != -1 && range.end() < range.start()) { + throw new IllegalArgumentException( + "Range header is malformed. End byte is smaller than start byte."); + } + } else { + throw new IllegalArgumentException( + "Range header is malformed. Only bytes supported as range type."); + } + + return range; + } + } +} diff --git a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/OSSMockRule.java b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/OSSMockRule.java new file mode 100644 index 000000000000..bea242e9e945 --- /dev/null +++ b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/OSSMockRule.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aliyun.oss.mock; + +import com.aliyun.oss.OSS; +import com.aliyun.oss.OSSClientBuilder; +import java.io.File; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Files; +import java.util.Map; +import org.apache.commons.io.FileUtils; +import org.apache.iceberg.aliyun.oss.OSSTestRule; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class OSSMockRule implements OSSTestRule { + private static final Logger LOG = LoggerFactory.getLogger(OSSMockRule.class); + + private final Map properties; + + private OSSMockApplication ossMockApp; + + private OSSMockRule(Map properties) { + this.properties = properties; + } + + @Override + public void start() { + ossMockApp = OSSMockApplication.start(properties); + } + + @Override + public void stop() { + ossMockApp.stop(); + } + + @Override + public OSS createOSSClient() { + String endpoint = String.format("http://localhost:%s", properties.getOrDefault(OSSMockApplication.PROP_HTTP_PORT, + OSSMockApplication.PORT_HTTP_PORT_DEFAULT)); + return new OSSClientBuilder().build(endpoint, "foo", "bar"); + } + + @Override + public String keyPrefix() { + return "mock-objects/"; + } + + private File rootDir() { + Object rootDir = properties.get(OSSMockApplication.PROP_ROOT_DIR); + Preconditions.checkNotNull(rootDir, "Root directory cannot be null"); + return new File(rootDir.toString()); + } + + @Override + public void setUpBucket(String bucket) { + createOSSClient().createBucket(bucket); + } + + @Override + public void tearDownBucket(String bucket) { + try { + Files.walk(rootDir().toPath()) + .filter(p -> p.toFile().isFile()) + .forEach(p -> { + try { + Files.delete(p); + } catch (IOException e) { + // delete this files quietly. + } + }); + + createOSSClient().deleteBucket(bucket); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private Map props = Maps.newHashMap(); + + public Builder withRootDir(String rootDir) { + props.put(OSSMockApplication.PROP_ROOT_DIR, rootDir); + return this; + } + + public Builder withHttpPort(int httpPort) { + props.put(OSSMockApplication.PROP_HTTP_PORT, httpPort); + return this; + } + + public Builder silent() { + props.put(OSSMockApplication.PROP_SILENT, true); + return this; + } + + public OSSMockRule build() { + if (props.get(OSSMockApplication.PROP_ROOT_DIR) == null) { + withRootDir(createRootDir().getAbsolutePath()); + } + + return new OSSMockRule(props); + } + + private File createRootDir() { + String rootDir = (String) props.get(OSSMockApplication.PROP_ROOT_DIR); + + File root; + if (rootDir == null || rootDir.isEmpty()) { + root = new File(FileUtils.getTempDirectory(), "oss-mock-file-store" + System.currentTimeMillis()); + } else { + root = new File(rootDir); + } + + root.deleteOnExit(); + root.mkdir(); + + LOG.info("Root directory of local OSS store is {}", root); + + return root; + } + } +} diff --git a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/ObjectMetadata.java b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/ObjectMetadata.java new file mode 100644 index 000000000000..95fbd0198824 --- /dev/null +++ b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/ObjectMetadata.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aliyun.oss.mock; + +import java.util.Map; + +public class ObjectMetadata { + + private long contentLength; + + // In millis + private long lastModificationDate; + + private String contentMD5; + + private String contentType; + + private String contentEncoding; + + private Map userMetaData; + + private String dataFile; + + private String metaFile; + + // The following getters and setters are required for Jackson ObjectMapper serialization and deserialization. + + public long getContentLength() { + return contentLength; + } + + public void setContentLength(long contentLength) { + this.contentLength = contentLength; + } + + public long getLastModificationDate() { + return lastModificationDate; + } + + public void setLastModificationDate(long lastModificationDate) { + this.lastModificationDate = lastModificationDate; + } + + public String getContentMD5() { + return contentMD5; + } + + public void setContentMD5(String contentMD5) { + this.contentMD5 = contentMD5; + } + + public String getContentType() { + return contentType; + } + + public void setContentType(String contentType) { + this.contentType = contentType; + } + + public String getContentEncoding() { + return contentEncoding; + } + + public void setContentEncoding(String contentEncoding) { + this.contentEncoding = contentEncoding; + } + + public Map getUserMetaData() { + return userMetaData; + } + + public void setUserMetaData(Map userMetaData) { + this.userMetaData = userMetaData; + } + + public String getDataFile() { + return dataFile; + } + + public void setDataFile(String dataFile) { + this.dataFile = dataFile; + } + + public String getMetaFile() { + return metaFile; + } + + public void setMetaFile(String metaFile) { + this.metaFile = metaFile; + } +} diff --git a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/Range.java b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/Range.java new file mode 100644 index 000000000000..dcf1291b95f7 --- /dev/null +++ b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/Range.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aliyun.oss.mock; + +public class Range { + + private final long start; + private final long end; + + public Range(long start, long end) { + this.start = start; + this.end = end; + } + + public long start() { + return start; + } + + public long end() { + return end; + } + + @Override + public String toString() { + return String.format("%d-%d", start, end); + } +} diff --git a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/TestLocalOSS.java b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/TestLocalOSS.java new file mode 100644 index 000000000000..4c174a24a3fc --- /dev/null +++ b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/TestLocalOSS.java @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aliyun.oss.mock; + +import com.aliyun.oss.OSS; +import com.aliyun.oss.OSSErrorCode; +import com.aliyun.oss.OSSException; +import com.aliyun.oss.model.AbortMultipartUploadRequest; +import com.aliyun.oss.model.CompleteMultipartUploadRequest; +import com.aliyun.oss.model.CompleteMultipartUploadResult; +import com.aliyun.oss.model.InitiateMultipartUploadRequest; +import com.aliyun.oss.model.InitiateMultipartUploadResult; +import com.aliyun.oss.model.PartETag; +import com.aliyun.oss.model.PutObjectResult; +import com.aliyun.oss.model.UploadPartRequest; +import com.aliyun.oss.model.UploadPartResult; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.List; +import java.util.Objects; +import java.util.Random; +import java.util.UUID; +import org.apache.commons.io.IOUtils; +import org.apache.iceberg.aliyun.oss.OSSTestRule; +import org.apache.iceberg.aliyun.oss.OSSURI; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.springframework.util.StringUtils; + +public class TestLocalOSS { + + @ClassRule + public static final OSSTestRule OSS_TEST_RULE = OSSMockRule.builder().silent().build(); + + private final OSS oss = OSS_TEST_RULE.createOSSClient(); + private final String bucketName = OSS_TEST_RULE.testBucketName(); + private final String keyPrefix = OSS_TEST_RULE.keyPrefix(); + private final Random random = new Random(1); + + @Before + public void before() { + OSS_TEST_RULE.setUpBucket(bucketName); + } + + @After + public void after() { + OSS_TEST_RULE.tearDownBucket(bucketName); + } + + @Test + public void testBuckets() { + Assert.assertTrue(doesBucketExist(bucketName)); + assertThrows(() -> oss.createBucket(bucketName), OSSErrorCode.BUCKET_ALREADY_EXISTS); + + oss.deleteBucket(bucketName); + Assert.assertFalse(doesBucketExist(bucketName)); + + oss.createBucket(bucketName); + Assert.assertTrue(doesBucketExist(bucketName)); + } + + @Test + public void testDeleteBucket() { + String bucketNotExist = String.format("bucket-not-existing-%s", UUID.randomUUID()); + assertThrows(() -> oss.deleteBucket(bucketNotExist), OSSErrorCode.NO_SUCH_BUCKET); + + byte[] bytes = new byte[2000]; + random.nextBytes(bytes); + + oss.putObject(bucketName, "object1", wrap(bytes)); + + oss.putObject(bucketName, "object2", wrap(bytes)); + + assertThrows(() -> oss.deleteBucket(bucketName), OSSErrorCode.BUCKET_NOT_EMPTY); + + oss.deleteObject(bucketName, "object1"); + assertThrows(() -> oss.deleteBucket(bucketName), OSSErrorCode.BUCKET_NOT_EMPTY); + + oss.deleteObject(bucketName, "object2"); + oss.deleteBucket(bucketName); + Assert.assertFalse(doesBucketExist(bucketName)); + + oss.createBucket(bucketName); + } + + @Test + public void testPutObject() throws IOException { + byte[] bytes = new byte[4 * 1024]; + random.nextBytes(bytes); + + String bucketNotExist = String.format("bucket-not-existing-%s", UUID.randomUUID()); + assertThrows(() -> oss.putObject(bucketNotExist, "object", wrap(bytes)), OSSErrorCode.NO_SUCH_BUCKET); + + PutObjectResult result = oss.putObject(bucketName, "object", wrap(bytes)); + Assert.assertEquals(LocalStore.md5sum(wrap(bytes)), result.getETag()); + } + + @Test + public void testDoesObjectExist() { + Assert.assertFalse(oss.doesObjectExist(bucketName, "key")); + + Assert.assertFalse(oss.doesObjectExist(bucketName, "key")); + + byte[] bytes = new byte[4 * 1024]; + random.nextBytes(bytes); + oss.putObject(bucketName, "key", wrap(bytes)); + + Assert.assertTrue(oss.doesObjectExist(bucketName, "key")); + oss.deleteObject(bucketName, "key"); + } + + @Test + public void testGetObject() throws IOException { + String bucketNotExist = String.format("bucket-not-existing-%s", UUID.randomUUID()); + assertThrows(() -> oss.getObject(bucketNotExist, "key"), OSSErrorCode.NO_SUCH_BUCKET); + + assertThrows(() -> oss.getObject(bucketName, "key"), OSSErrorCode.NO_SUCH_KEY); + + byte[] bytes = new byte[2000]; + random.nextBytes(bytes); + + oss.putObject(bucketName, "key", new ByteArrayInputStream(bytes)); + + byte[] actual = new byte[2000]; + IOUtils.readFully(oss.getObject(bucketName, "key").getObjectContent(), actual); + + Assert.assertArrayEquals(bytes, actual); + oss.deleteObject(bucketName, "key"); + } + + @Test + public void testMultiUpload() throws Exception { + OSSURI uri = new OSSURI(location("normal-multi-upload-key.dat")); + + // Step.1 Initialize the multi-upload request. + InitiateMultipartUploadRequest request = new InitiateMultipartUploadRequest(uri.bucket(), uri.key()); + InitiateMultipartUploadResult result = oss.initiateMultipartUpload(request); + Assert.assertEquals(bucketName, result.getBucketName()); + Assert.assertEquals(uri.key(), result.getKey()); + + String uploadId = result.getUploadId(); + Assert.assertFalse(StringUtils.isEmpty(uploadId)); + + // Step.2 Upload multi parts. + ByteArrayOutputStream out = new ByteArrayOutputStream(); + List completedPartEtags = Lists.newArrayList(); + byte[] data = new byte[100 * 1024]; + for (int i = 1; i <= 10; i++) { + random.nextBytes(data); + completedPartEtags.add(uploadPart(uri, uploadId, i, data)); + out.write(data); + } + + // Step.3 Complete the uploading. + CompleteMultipartUploadRequest completeRequest = new CompleteMultipartUploadRequest(uri.bucket(), uri.key(), + uploadId, completedPartEtags); + CompleteMultipartUploadResult completeResult = oss.completeMultipartUpload(completeRequest); + Assert.assertEquals(LocalStore.md5sum(wrap(out.toByteArray())), completeResult.getETag()); + } + + @Test + public void testAbortMultiUpload() throws Exception { + OSSURI uri = new OSSURI(location("abort-multi-upload-key.dat")); + + // Step.1 Initialize the multi-upload request. + InitiateMultipartUploadRequest request = new InitiateMultipartUploadRequest(uri.bucket(), uri.key()); + InitiateMultipartUploadResult result = oss.initiateMultipartUpload(request); + Assert.assertEquals(bucketName, result.getBucketName()); + Assert.assertEquals(uri.key(), result.getKey()); + + String uploadId = result.getUploadId(); + Assert.assertFalse(StringUtils.isEmpty(uploadId)); + + // Step.2 Upload multi parts. + List completedPartEtags = Lists.newArrayList(); + byte[] data = new byte[100 * 1024]; + for (int i = 1; i <= 10; i++) { + random.nextBytes(data); + completedPartEtags.add(uploadPart(uri, uploadId, i, data)); + } + + // Step.3 Abort the uploading. + AbortMultipartUploadRequest abortRequest = new AbortMultipartUploadRequest(uri.bucket(), uri.key(), uploadId); + oss.abortMultipartUpload(abortRequest); + + // Step.4 Check the existence of the object. + random.nextBytes(data); + PutObjectResult putObjectResult = oss.putObject(bucketName, "object", wrap(data)); + Assert.assertEquals(LocalStore.md5sum(wrap(data)), putObjectResult.getETag()); + } + + private PartETag uploadPart(OSSURI uri, String uploadId, int partNumber, byte[] data) { + UploadPartRequest request = new UploadPartRequest(uri.bucket(), uri.key(), uploadId, partNumber, + wrap(data), data.length); + + UploadPartResult result = oss.uploadPart(request); + Assert.assertEquals(result.getPartNumber(), partNumber); + Assert.assertEquals(result.getPartSize(), data.length); + + return result.getPartETag(); + } + + private String location(String key) { + return String.format("oss://%s/%s%s", bucketName, keyPrefix, key); + } + + private InputStream wrap(byte[] data) { + return new ByteArrayInputStream(data); + } + + private boolean doesBucketExist(String bucket) { + try { + oss.createBucket(bucket); + oss.deleteBucket(bucket); + return false; + } catch (OSSException e) { + if (Objects.equals(e.getErrorCode(), OSSErrorCode.BUCKET_ALREADY_EXISTS)) { + return true; + } + throw e; + } + } + + private static void assertThrows(Runnable runnable, String expectedErrorCode) { + try { + runnable.run(); + Assert.fail("No exception was thrown, expected errorCode: " + expectedErrorCode); + } catch (OSSException e) { + Assert.assertEquals(expectedErrorCode, e.getErrorCode()); + } + } +} diff --git a/build.gradle b/build.gradle index 00ad33463f7a..090448fb5e81 100644 --- a/build.gradle +++ b/build.gradle @@ -259,6 +259,35 @@ project(':iceberg-data') { } } +project(':iceberg-aliyun') { + dependencies { + compile project(':iceberg-api') + compile project(':iceberg-core') + + compileOnly 'com.aliyun.oss:aliyun-sdk-oss' + + compileOnly("org.apache.hadoop:hadoop-common") { + exclude group: 'org.apache.avro', module: 'avro' + exclude group: 'org.slf4j', module: 'slf4j-log4j12' + exclude group: 'javax.servlet', module: 'servlet-api' + exclude group: 'com.google.code.gson', module: 'gson' + } + + testCompile 'com.fasterxml.jackson.dataformat:jackson-dataformat-xml:2.9.9' + testCompile 'org.springframework:spring-web' + testCompile('org.springframework.boot:spring-boot-starter-jetty') { + exclude module: 'logback-classic' + exclude group: 'org.eclipse.jetty.websocket', module: 'javax-websocket-server-impl' + exclude group: 'org.eclipse.jetty.websocket', module: 'websocket-server' + } + testCompile('org.springframework.boot:spring-boot-starter-web') { + exclude module: 'logback-classic' + exclude module: 'spring-boot-starter-logging' + } + testCompile project(path: ':iceberg-api', configuration: 'testArtifacts') + } +} + project(':iceberg-aws') { dependencies { compile project(':iceberg-api') @@ -396,6 +425,7 @@ project(':iceberg-flink-runtime') { dependencies { compile project(':iceberg-flink') compile project(':iceberg-aws') + compile project(':iceberg-aliyun') compile(project(':iceberg-nessie')) { exclude group: 'com.google.code.findbugs', module: 'jsr305' } @@ -866,6 +896,8 @@ if (jdkVersion == '8') { dependencies { compile project(':iceberg-spark2') compile project(':iceberg-aws') + compile project(':iceberg-aliyun') + compile 'org.apache.spark:spark-hive_2.11' compile(project(':iceberg-nessie')) { exclude group: 'com.google.code.findbugs', module: 'jsr305' } @@ -1024,6 +1056,8 @@ project(':iceberg-spark3-runtime') { compile project(':iceberg-spark3') compile project(':iceberg-spark3-extensions') compile project(':iceberg-aws') + compile project(':iceberg-aliyun') + compile 'org.apache.spark:spark-hive_2.11' compile(project(':iceberg-nessie')) { exclude group: 'com.google.code.findbugs', module: 'jsr305' } diff --git a/settings.gradle b/settings.gradle index 037bdf802385..052624bd0275 100644 --- a/settings.gradle +++ b/settings.gradle @@ -22,6 +22,7 @@ include 'api' include 'common' include 'core' include 'data' +include 'aliyun' include 'aws' include 'flink' include 'flink-runtime' @@ -43,6 +44,7 @@ project(':api').name = 'iceberg-api' project(':common').name = 'iceberg-common' project(':core').name = 'iceberg-core' project(':data').name = 'iceberg-data' +project(':aliyun').name = 'iceberg-aliyun' project(':aws').name = 'iceberg-aws' project(':flink').name = 'iceberg-flink' project(':flink-runtime').name = 'iceberg-flink-runtime' diff --git a/versions.props b/versions.props index ceee38ac3f15..554b599ca698 100644 --- a/versions.props +++ b/versions.props @@ -11,6 +11,7 @@ org.apache.spark:spark-hive_2.11 = 2.4.7 org.apache.spark:spark-avro_2.11 = 2.4.7 org.apache.spark:spark-hive_2.12 = 3.0.1 org.apache.pig:pig = 0.14.0 +com.aliyun.oss:aliyun-sdk-oss = 3.0.0 com.fasterxml.jackson.*:* = 2.11.4 com.google.guava:guava = 28.0-jre com.github.ben-manes.caffeine:caffeine = 2.7.0 @@ -30,4 +31,6 @@ org.apache.hive:hive-exec = 2.3.8 org.apache.hive:hive-service = 2.3.8 org.apache.tez:tez-dag = 0.8.4 org.apache.tez:tez-mapreduce = 0.8.4 +org.springframework:* = 5.1.10.RELEASE +org.springframework.boot:* = 2.1.9.RELEASE com.adobe.testing:s3mock-junit4 = 2.1.28