Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.aws.s3;

import java.io.IOException;
import java.util.Random;
import java.util.UUID;
import java.util.function.Supplier;
import java.util.stream.IntStream;
import org.apache.iceberg.aws.AwsClientUtil;
import org.apache.iceberg.aws.AwsIntegTestUtil;
import org.apache.iceberg.aws.AwsProperties;
import org.apache.iceberg.io.PositionOutputStream;
import org.apache.iceberg.io.SeekableInputStream;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import software.amazon.awssdk.services.s3.S3Client;

/**
* Long-running tests to ensure multipart upload logic is resilient
*/
public class S3MultipartUploadTest {

private final Random random = new Random(1);
private static S3Client s3;
private static String bucketName;
private static String prefix;
private static AwsProperties properties;
private static S3FileIO io;
private String objectUri;

@BeforeClass
public static void beforeClass() {
s3 = AwsClientUtil.defaultS3Client();
bucketName = AwsIntegTestUtil.testBucketName();
prefix = UUID.randomUUID().toString();
properties = new AwsProperties();
properties.setS3FileIoMultiPartSize(AwsProperties.S3FILEIO_MULTIPART_SIZE_MIN);
io = new S3FileIO(() -> s3, properties);
}

@AfterClass
public static void afterClass() {
AwsIntegTestUtil.cleanS3Bucket(s3, bucketName, prefix);
}

@Before
public void before() {
String objectKey = String.format("%s/%s", prefix, UUID.randomUUID().toString());
objectUri = String.format("s3://%s/%s", bucketName, objectKey);
}

@Test
public void testManyParts_writeWithInt() throws IOException {
int parts = 200;
writeInts(objectUri, parts, random::nextInt);
Assert.assertEquals(parts * (long) AwsProperties.S3FILEIO_MULTIPART_SIZE_MIN,
io.newInputFile(objectUri).getLength());
}

@Test
public void testManyParts_writeWithBytes() throws IOException {
int parts = 200;
byte[] bytes = new byte[AwsProperties.S3FILEIO_MULTIPART_SIZE_MIN];
writeBytes(objectUri, parts, () -> {
random.nextBytes(bytes);
return bytes;
});
Assert.assertEquals(parts * (long) AwsProperties.S3FILEIO_MULTIPART_SIZE_MIN,
io.newInputFile(objectUri).getLength());
}

@Test
public void testContents_writeWithInt() throws IOException {
writeInts(objectUri, 10, () -> 6);
verifyInts(objectUri, () -> 6);
}

@Test
public void testContents_writeWithBytes() throws IOException {
byte[] bytes = new byte[AwsProperties.S3FILEIO_MULTIPART_SIZE_MIN];
for (int i = 0; i < AwsProperties.S3FILEIO_MULTIPART_SIZE_MIN; i++) {
bytes[i] = 6;
}
writeBytes(objectUri, 10, () -> bytes);
verifyInts(objectUri, () -> 6);
}

@Test
public void testUploadRemainder() throws IOException {
long length = 3 * AwsProperties.S3FILEIO_MULTIPART_SIZE_MIN + 2 * 1024 * 1024;
writeInts(objectUri, 1, length, random::nextInt);
Assert.assertEquals(length, io.newInputFile(objectUri).getLength());
}

@Test
public void testParallelUpload() throws IOException {
int threads = 16;
IntStream.range(0, threads).parallel()
.forEach(d -> writeInts(objectUri + d, 3, () -> d));

for (int i = 0; i < threads; i++) {
final int d = i;
verifyInts(objectUri + d, () -> d);
}
}

private void writeInts(String fileUri, int parts, Supplier<Integer> writer) {
writeInts(fileUri, parts, AwsProperties.S3FILEIO_MULTIPART_SIZE_MIN, writer);
}

private void writeInts(String fileUri, int parts, long partSize, Supplier<Integer> writer) {
try (PositionOutputStream outputStream = io.newOutputFile(fileUri).create()) {
for (int i = 0; i < parts; i++) {
for (long j = 0; j < partSize; j++) {
outputStream.write(writer.get());
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}

private void verifyInts(String fileUri, Supplier<Integer> verifier) {
try (SeekableInputStream inputStream = io.newInputFile(fileUri).newStream()) {
int cur;
while ((cur = inputStream.read()) != -1) {
Assert.assertEquals(verifier.get().intValue(), cur);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}

private void writeBytes(String fileUri, int parts, Supplier<byte[]> writer) {
try (PositionOutputStream outputStream = io.newOutputFile(fileUri).create()) {
for (int i = 0; i < parts; i++) {
outputStream.write(writer.get());
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
65 changes: 48 additions & 17 deletions aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ public class AwsProperties {

/**
* Type of S3 Server side encryption used, default to {@link AwsProperties#S3FILEIO_SSE_TYPE_NONE}.
* <p>
* For more details: https://docs.aws.amazon.com/AmazonS3/latest/dev/serv-side-encryption.html
*/
public static final String S3FILEIO_SSE_TYPE = "s3fileio.sse.type";

Expand All @@ -38,18 +40,21 @@ public class AwsProperties {

/**
* S3 SSE-KMS encryption.
* <p>
* For more details: https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingKMSEncryption.html
*/
public static final String S3FILEIO_SSE_TYPE_KMS = "kms";

/**
* S3 SSE-S3 encryption.
* <p>
* For more details: https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingServerSideEncryption.html
*/
public static final String S3FILEIO_SSE_TYPE_S3 = "s3";

/**
* S3 SSE-C encryption.
* <p>
* For more details: https://docs.aws.amazon.com/AmazonS3/latest/dev/ServerSideEncryptionCustomerKeys.html
*/
public static final String S3FILEIO_SSE_TYPE_CUSTOM = "custom";
Expand All @@ -70,6 +75,7 @@ public class AwsProperties {
/**
* The ID of the Glue Data Catalog where the tables reside.
* If none is provided, Glue automatically uses the caller's AWS account ID by default.
* <p>
* For more details, see https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-catalog-databases.html
*/
public static final String GLUE_CATALOG_ID = "gluecatalog.id";
Expand All @@ -84,41 +90,45 @@ public class AwsProperties {
public static final boolean GLUE_CATALOG_SKIP_ARCHIVE_DEFAULT = false;

/**
* Number of threads to use for uploading parts to S3 (shared pool across all output streams).
* Number of threads to use for uploading parts to S3 (shared pool across all output streams),
* default to {@link Runtime#availableProcessors()}
*/
public static final String S3FILEIO_MULTIPART_UPLOAD_THREADS = "s3fileio.multipart.num-threads";

/**
* The size of a single part for multipart upload requests (default: 32MB).
* The size of a single part for multipart upload requests in bytes (default: 32MB).
* based on S3 requirement, the part size must be at least 5MB.
* Too ensure performance of the reader and writer, the part size must be less than 2GB.
* <p>
* For more details, see https://docs.aws.amazon.com/AmazonS3/latest/dev/qfacts.html
*/
public static final String S3FILEIO_MULTIPART_SIZE = "s3fileio.multipart.part.size";
public static final int S3FILEIO_MULTIPART_SIZE_DEFAULT = 32 * 1024 * 1024;
public static final int S3FILEIO_MULTIPART_SIZE_MIN = 5 * 1024 * 1024;

/**
* The threshold expressed as a factor times the multipart size at which to
* switch from uploading using a single put object request to uploading using multipart upload
* (default: 1.5).
*/
public static final String S3FILEIO_MULTIPART_THRESHOLD_FACTOR = "s3fileio.multipart.threshold";
public static final double S3FILEIO_MULTIPART_THRESHOLD_FACTOR_DEFAULT = 1.5;

/**
* Location to put staging files for upload to S3.
* Location to put staging files for upload to S3, default to temp directory set in java.io.tmpdir.
*/
public static final String S3FILEIO_STAGING_DIRECTORY = "s3fileio.staging.dir";

/**
* Used to set canned access control list for S3 client to use during write.
* For more details: https://docs.aws.amazon.com/AmazonS3/latest/dev/acl-overview.html
* Used to configure canned access control list (ACL) for S3 client to use during write.
* If not set, ACL will not be set for requests.
* <p>
* The input must be one of {@link software.amazon.awssdk.services.s3.model.ObjectCannedACL},
* such as 'public-read-write'
* If not set, ACL will not be set for requests.
* For more details: https://docs.aws.amazon.com/AmazonS3/latest/dev/acl-overview.html
*/
public static final String S3FILEIO_ACL = "s3fileio.acl";


static final int MIN_MULTIPART_UPLOAD_SIZE = 5 * 1024 * 1024;
static final int DEFAULT_MULTIPART_SIZE = 32 * 1024 * 1024;
static final double DEFAULT_MULTIPART_THRESHOLD = 1.5;

private String s3FileIoSseType;
private String s3FileIoSseKey;
private String s3FileIoSseMd5;
Expand All @@ -138,8 +148,8 @@ public AwsProperties() {
this.s3FileIoAcl = null;

this.s3FileIoMultipartUploadThreads = Runtime.getRuntime().availableProcessors();
this.s3FileIoMultiPartSize = DEFAULT_MULTIPART_SIZE;
this.s3FileIoMultipartThresholdFactor = DEFAULT_MULTIPART_THRESHOLD;
this.s3FileIoMultiPartSize = S3FILEIO_MULTIPART_SIZE_DEFAULT;
this.s3FileIoMultipartThresholdFactor = S3FILEIO_MULTIPART_THRESHOLD_FACTOR_DEFAULT;
this.s3fileIoStagingDirectory = System.getProperty("java.io.tmpdir");

this.glueCatalogId = null;
Expand All @@ -163,16 +173,21 @@ public AwsProperties(Map<String, String> properties) {
this.s3FileIoMultipartUploadThreads = PropertyUtil.propertyAsInt(properties, S3FILEIO_MULTIPART_UPLOAD_THREADS,
Runtime.getRuntime().availableProcessors());

this.s3FileIoMultiPartSize = PropertyUtil.propertyAsInt(properties, S3FILEIO_MULTIPART_SIZE,
DEFAULT_MULTIPART_SIZE);
try {
this.s3FileIoMultiPartSize = PropertyUtil.propertyAsInt(properties, S3FILEIO_MULTIPART_SIZE,
S3FILEIO_MULTIPART_SIZE_DEFAULT);
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Input malformed or exceeded maximum multipart upload size 5GB: %s" +
properties.get(S3FILEIO_MULTIPART_SIZE));
}

this.s3FileIoMultipartThresholdFactor = PropertyUtil.propertyAsDouble(properties,
S3FILEIO_MULTIPART_THRESHOLD_FACTOR, DEFAULT_MULTIPART_THRESHOLD);
S3FILEIO_MULTIPART_THRESHOLD_FACTOR, S3FILEIO_MULTIPART_THRESHOLD_FACTOR_DEFAULT);

Preconditions.checkArgument(s3FileIoMultipartThresholdFactor >= 1.0,
"Multipart threshold factor must be >= to 1.0");

Preconditions.checkArgument(s3FileIoMultiPartSize >= MIN_MULTIPART_UPLOAD_SIZE,
Preconditions.checkArgument(s3FileIoMultiPartSize >= S3FILEIO_MULTIPART_SIZE_MIN,
"Minimum multipart upload object size must be larger than 5 MB.");

this.s3fileIoStagingDirectory = PropertyUtil.propertyAsString(properties, S3FILEIO_STAGING_DIRECTORY,
Expand Down Expand Up @@ -228,18 +243,34 @@ public int s3FileIoMultipartUploadThreads() {
return s3FileIoMultipartUploadThreads;
}

public void setS3FileIoMultipartUploadThreads(int threads) {
this.s3FileIoMultipartUploadThreads = threads;
}

public int s3FileIoMultiPartSize() {
return s3FileIoMultiPartSize;
}

public void setS3FileIoMultiPartSize(int size) {
this.s3FileIoMultiPartSize = size;
}

public double s3FileIOMultipartThresholdFactor() {
return s3FileIoMultipartThresholdFactor;
}

public void setS3FileIoMultipartThresholdFactor(double factor) {
this.s3FileIoMultipartThresholdFactor = factor;
}

public String getS3fileIoStagingDirectory() {
return s3fileIoStagingDirectory;
}

public void setS3fileIoStagingDirectory(String directory) {
this.s3fileIoStagingDirectory = directory;
}

public ObjectCannedACL s3FileIoAcl() {
return this.s3FileIoAcl;
}
Expand Down
4 changes: 3 additions & 1 deletion aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@

/**
* FileIO implementation backed by S3.
* <p>
* Locations used must follow the conventions for S3 URIs (e.g. s3://bucket/path...).
* See {@link S3URI#VALID_SCHEMES} for the list of supported S3 URI schemes.
* URIs with schemes s3a, s3n, https are also treated as s3 file paths.
* Using this FileIO with other schemes will result in {@link org.apache.iceberg.exceptions.ValidationException}.
*/
public class S3FileIO implements FileIO {
private final SerializableSupplier<S3Client> s3;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ class S3OutputStream extends PositionOutputStream {
private long pos = 0;
private boolean closed = false;

@SuppressWarnings({"StaticAssignmentInConstructor", "StaticGuardedByInstance"})
S3OutputStream(S3Client s3, S3URI location, AwsProperties awsProperties) throws IOException {
if (executorService == null) {
synchronized (this) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import software.amazon.awssdk.services.s3.model.ServerSideEncryption;
import software.amazon.awssdk.services.s3.model.UploadPartRequest;

@SuppressWarnings("UnnecessaryLambda")
public class S3RequestUtil {

private static final Function<ServerSideEncryption, S3Request.Builder> NULL_SSE_SETTER = sse -> null;
Expand Down
30 changes: 30 additions & 0 deletions aws/src/test/java/org/apache/iceberg/aws/AwsPropertiesTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,34 @@ public void testS3FileIoAcl_unknownType() {
() -> new AwsProperties(map));
}

@Test
public void testS3MultipartSizeTooSmall() {
Map<String, String> map = Maps.newHashMap();
map.put(AwsProperties.S3FILEIO_MULTIPART_SIZE, "1");
AssertHelpers.assertThrows("should not accept small part size",
IllegalArgumentException.class,
"Minimum multipart upload object size must be larger than 5 MB",
() -> new AwsProperties(map));
}

@Test
public void testS3MultipartSizeTooLarge() {
Map<String, String> map = Maps.newHashMap();
map.put(AwsProperties.S3FILEIO_MULTIPART_SIZE, "5368709120"); // 5GB
AssertHelpers.assertThrows("should not accept too big part size",
IllegalArgumentException.class,
"Input malformed or exceeded maximum multipart upload size 5GB",
() -> new AwsProperties(map));
}

@Test
public void testS3MultipartThresholdFactorLessThanOne() {
Map<String, String> map = Maps.newHashMap();
map.put(AwsProperties.S3FILEIO_MULTIPART_THRESHOLD_FACTOR, "0.9");
AssertHelpers.assertThrows("should not accept factor less than 1",
IllegalArgumentException.class,
"Multipart threshold factor must be >= to 1.0",
() -> new AwsProperties(map));
}

}