Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.aws.s3;

import java.io.IOException;
import java.io.InputStream;
import java.util.Random;
import java.util.UUID;
import java.util.stream.IntStream;
import org.apache.iceberg.aws.AwsClientUtil;
import org.apache.iceberg.aws.AwsIntegTestUtil;
import org.apache.iceberg.aws.AwsProperties;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.PositionOutputStream;
import org.apache.iceberg.io.SeekableInputStream;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import software.amazon.awssdk.services.s3.S3Client;

/**
* Long-running tests to ensure multipart upload logic is resilient
*/
public class S3MultipartUploadTest {

private final Random random = new Random(1);
private static S3Client s3;
private static String bucketName;
private static String prefix;
private String objectUri;

@BeforeClass
public static void beforeClass() {
s3 = AwsClientUtil.defaultS3Client();
bucketName = AwsIntegTestUtil.testBucketName();
prefix = UUID.randomUUID().toString();
}

@AfterClass
public static void afterClass() {
AwsIntegTestUtil.cleanS3Bucket(s3, bucketName, prefix);
}

@Before
public void before() {
String objectKey = String.format("%s/%s", prefix, UUID.randomUUID().toString());
objectUri = String.format("s3://%s/%s", bucketName, objectKey);
}

@Test
public void testManyParts_writeWithInt() throws IOException {
AwsProperties properties = new AwsProperties();
properties.setS3FileIoMultiPartSize(AwsProperties.S3FILEIO_MULTIPART_SIZE_MIN);
S3FileIO io = new S3FileIO(() -> s3, properties);
PositionOutputStream outputStream = io.newOutputFile(objectUri).create();
for (int i = 0; i < 100; i++) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I'm missing something, but I don't think these are actually testing the multipart upload. If we're writing with the OutputStream::write interface, that would only be writing a single byte, so 100 bytes in this case. That wouldn't be enough to trigger the multipart behavior.

I think that's the case for most of the tests I see here. You might want to look at the S3Outputstream test because you can actually validate the operations performed like this: https://github.com/apache/iceberg/blob/master/aws/src/test/java/org/apache/iceberg/aws/s3/S3OutputStreamTest.java#L109

Copy link
Contributor Author

@jackye1995 jackye1995 Dec 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that would only be writing a single byte, so 100 bytes in this case

There is an internal loop for (int j = 0; j < AwsProperties.S3FILEIO_MULTIPART_SIZE_MIN; j++).

You might want to look at the S3Outputstream test because you can actually validate the operations performed like this

the tests here are trying to verify against actual result in s3 instead of verifying the number of calls, because I know those are verified in the tests you referenced. But I think I am being very not DRY here, let me refactor the tests a little bit

Copy link
Contributor

@danielcweeks danielcweeks Dec 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I just mixed up this test and the next one and missed the inner loop here. However, you might be able to combine some of the upload and content validation into a single test, but it looks like you already have some thoughts on it, so I'll wait.

I guess there's two minor questions I have:

  1. Is it reasonable to be creating large files in S3 as part of the integration test (I'm not clear on if we run these as part of our actual build or it's left up to users to run in their own accounts).
  2. Are there cases where we think the s3mock wouldn't catch something that these tests would?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Is it reasonable to be creating large files in S3 as part of the integration test (I'm not clear on if we run these as part of our actual build or it's left up to users to run in their own accounts).

I don't expect this to be run for every actual build, and the tests take quite a while to complete, so it's mostly for users to run in their own account. With that being said, I am in progress of potentially getting an account to run these tests for all PRs committing to the aws module with cost covered.

  1. Are there cases where we think the s3mock wouldn't catch something that these tests would?

It is hard to say how different is the actual S3 compared to S3mock, so this serves as a line of defense to catch potentially different behaviors and potential errors during non-local network calls.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danielcweeks refactored tests, please let me know if it looks good to you.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, looks good. It seems for #2 there are a number of things we won't be able to test against s3mock (like sts) so it makes sense to add these integration tests once we have an account.

Thanks!

for (int j = 0; j < AwsProperties.S3FILEIO_MULTIPART_SIZE_MIN; j++) {
outputStream.write(random.nextInt());
}
}
outputStream.close();
Assert.assertEquals(100 * (long) AwsProperties.S3FILEIO_MULTIPART_SIZE_MIN,
io.newInputFile(objectUri).getLength());
}

@Test
public void testManyParts_writeWithBytes() throws IOException {
AwsProperties properties = new AwsProperties();
properties.setS3FileIoMultiPartSize(AwsProperties.S3FILEIO_MULTIPART_SIZE_MIN);
S3FileIO io = new S3FileIO(() -> s3, properties);
PositionOutputStream outputStream = io.newOutputFile(objectUri).create();
byte[] bytes = new byte[AwsProperties.S3FILEIO_MULTIPART_SIZE_MIN];
for (int i = 0; i < 100; i++) {
random.nextBytes(bytes);
outputStream.write(bytes);
}
outputStream.close();
Assert.assertEquals(100 * (long) AwsProperties.S3FILEIO_MULTIPART_SIZE_MIN,
io.newInputFile(objectUri).getLength());
}

@Test
public void testContents_writeWithInt() throws IOException {
AwsProperties properties = new AwsProperties();
properties.setS3FileIoMultiPartSize(AwsProperties.S3FILEIO_MULTIPART_SIZE_MIN);
S3FileIO io = new S3FileIO(() -> s3, properties);
PositionOutputStream outputStream = io.newOutputFile(objectUri).create();
for (int i = 0; i < 10; i++) {
for (int j = 0; j < AwsProperties.S3FILEIO_MULTIPART_SIZE_MIN; j++) {
outputStream.write(6);
}
}
outputStream.close();
SeekableInputStream inputStream = io.newInputFile(objectUri).newStream();
int cur;
while ((cur = inputStream.read()) != -1) {
Assert.assertEquals(6, cur);
}
inputStream.close();
}

@Test
public void testContents_writeWithBytes() throws IOException {
AwsProperties properties = new AwsProperties();
properties.setS3FileIoMultiPartSize(AwsProperties.S3FILEIO_MULTIPART_SIZE_MIN);
S3FileIO io = new S3FileIO(() -> s3, properties);
PositionOutputStream outputStream = io.newOutputFile(objectUri).create();
byte[] bytes = new byte[AwsProperties.S3FILEIO_MULTIPART_SIZE_MIN];
for (int i = 0; i < AwsProperties.S3FILEIO_MULTIPART_SIZE_MIN; i++) {
bytes[i] = 6;
}
for (int i = 0; i < 10; i++) {
outputStream.write(bytes);
}
outputStream.close();
SeekableInputStream inputStream = io.newInputFile(objectUri).newStream();
int cur;
while ((cur = inputStream.read()) != -1) {
Assert.assertEquals(6, cur);
}
inputStream.close();
}

@Test
public void testUploadRemainder() throws IOException {
AwsProperties properties = new AwsProperties();
properties.setS3FileIoMultiPartSize(AwsProperties.S3FILEIO_MULTIPART_SIZE_MIN);
properties.setS3FileIoMultipartThresholdFactor(1);
S3FileIO io = new S3FileIO(() -> s3, properties);
PositionOutputStream outputStream = io.newOutputFile(objectUri).create();
long length = 3 * AwsProperties.S3FILEIO_MULTIPART_SIZE_MIN + 2 * 1024 * 1024;
for (int i = 0; i < length; i++) {
outputStream.write(random.nextInt());
}
outputStream.close();
Assert.assertEquals(length, io.newInputFile(objectUri).getLength());
}

@Test
public void testParallelUpload() throws IOException {
AwsProperties properties = new AwsProperties();
properties.setS3FileIoMultiPartSize(AwsProperties.S3FILEIO_MULTIPART_SIZE_MIN);
properties.setS3FileIoMultipartUploadThreads(16);
S3FileIO io = new S3FileIO(() -> s3, properties);
IntStream.range(0, 16).parallel().forEach(d -> {
byte[] bytes = new byte[AwsProperties.S3FILEIO_MULTIPART_SIZE_MIN];
for (int i = 0; i < AwsProperties.S3FILEIO_MULTIPART_SIZE_MIN; i++) {
bytes[i] = (byte) d;
}
PositionOutputStream outputStream = io.newOutputFile(objectUri + "_" + d).create();
try {
for (int i = 0; i < 3; i++) {
outputStream.write(bytes);
}
outputStream.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
});

for (int i = 0; i < 16; i++) {
String fileUri = objectUri + "_" + i;
InputFile inputFile = io.newInputFile(fileUri);
Assert.assertEquals(3 * (long) AwsProperties.S3FILEIO_MULTIPART_SIZE_MIN, inputFile.getLength());
int cur;
InputStream stream = inputFile.newStream();
while ((cur = stream.read()) != -1) {
Assert.assertEquals(i, cur);
}
stream.close();
}
}
}
65 changes: 48 additions & 17 deletions aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ public class AwsProperties {

/**
* Type of S3 Server side encryption used, default to {@link AwsProperties#S3FILEIO_SSE_TYPE_NONE}.
* <p>
* For more details: https://docs.aws.amazon.com/AmazonS3/latest/dev/serv-side-encryption.html
*/
public static final String S3FILEIO_SSE_TYPE = "s3fileio.sse.type";

Expand All @@ -38,18 +40,21 @@ public class AwsProperties {

/**
* S3 SSE-KMS encryption.
* <p>
* For more details: https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingKMSEncryption.html
*/
public static final String S3FILEIO_SSE_TYPE_KMS = "kms";

/**
* S3 SSE-S3 encryption.
* <p>
* For more details: https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingServerSideEncryption.html
*/
public static final String S3FILEIO_SSE_TYPE_S3 = "s3";

/**
* S3 SSE-C encryption.
* <p>
* For more details: https://docs.aws.amazon.com/AmazonS3/latest/dev/ServerSideEncryptionCustomerKeys.html
*/
public static final String S3FILEIO_SSE_TYPE_CUSTOM = "custom";
Expand All @@ -70,6 +75,7 @@ public class AwsProperties {
/**
* The ID of the Glue Data Catalog where the tables reside.
* If none is provided, Glue automatically uses the caller's AWS account ID by default.
* <p>
* For more details, see https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-catalog-databases.html
*/
public static final String GLUE_CATALOG_ID = "gluecatalog.id";
Expand All @@ -84,41 +90,45 @@ public class AwsProperties {
public static final boolean GLUE_CATALOG_SKIP_ARCHIVE_DEFAULT = false;

/**
* Number of threads to use for uploading parts to S3 (shared pool across all output streams).
* Number of threads to use for uploading parts to S3 (shared pool across all output streams),
* default to {@link Runtime#availableProcessors()}
*/
public static final String S3FILEIO_MULTIPART_UPLOAD_THREADS = "s3fileio.multipart.num-threads";

/**
* The size of a single part for multipart upload requests (default: 32MB).
* The size of a single part for multipart upload requests in bytes (default: 32MB).
* based on S3 requirement, the part size must be at least 5MB.
* Too ensure performance of the reader and writer, the part size must be less than 2GB.
* <p>
* For more details, see https://docs.aws.amazon.com/AmazonS3/latest/dev/qfacts.html
*/
public static final String S3FILEIO_MULTIPART_SIZE = "s3fileio.multipart.part.size";
public static final int S3FILEIO_MULTIPART_SIZE_DEFAULT = 32 * 1024 * 1024;
public static final int S3FILEIO_MULTIPART_SIZE_MIN = 5 * 1024 * 1024;

/**
* The threshold expressed as a factor times the multipart size at which to
* switch from uploading using a single put object request to uploading using multipart upload
* (default: 1.5).
*/
public static final String S3FILEIO_MULTIPART_THRESHOLD_FACTOR = "s3fileio.multipart.threshold";
public static final double S3FILEIO_MULTIPART_THRESHOLD_FACTOR_DEFAULT = 1.5;

/**
* Location to put staging files for upload to S3.
* Location to put staging files for upload to S3, default to temp directory set in java.io.tmpdir.
*/
public static final String S3FILEIO_STAGING_DIRECTORY = "s3fileio.staging.dir";

/**
* Used to set canned access control list for S3 client to use during write.
* For more details: https://docs.aws.amazon.com/AmazonS3/latest/dev/acl-overview.html
* Used to configure canned access control list (ACL) for S3 client to use during write.
* If not set, ACL will not be set for requests.
* <p>
* The input must be one of {@link software.amazon.awssdk.services.s3.model.ObjectCannedACL},
* such as 'public-read-write'
* If not set, ACL will not be set for requests.
* For more details: https://docs.aws.amazon.com/AmazonS3/latest/dev/acl-overview.html
*/
public static final String S3FILEIO_ACL = "s3fileio.acl";


static final int MIN_MULTIPART_UPLOAD_SIZE = 5 * 1024 * 1024;
static final int DEFAULT_MULTIPART_SIZE = 32 * 1024 * 1024;
static final double DEFAULT_MULTIPART_THRESHOLD = 1.5;

private String s3FileIoSseType;
private String s3FileIoSseKey;
private String s3FileIoSseMd5;
Expand All @@ -138,8 +148,8 @@ public AwsProperties() {
this.s3FileIoAcl = null;

this.s3FileIoMultipartUploadThreads = Runtime.getRuntime().availableProcessors();
this.s3FileIoMultiPartSize = DEFAULT_MULTIPART_SIZE;
this.s3FileIoMultipartThresholdFactor = DEFAULT_MULTIPART_THRESHOLD;
this.s3FileIoMultiPartSize = S3FILEIO_MULTIPART_SIZE_DEFAULT;
this.s3FileIoMultipartThresholdFactor = S3FILEIO_MULTIPART_THRESHOLD_FACTOR_DEFAULT;
this.s3fileIoStagingDirectory = System.getProperty("java.io.tmpdir");

this.glueCatalogId = null;
Expand All @@ -163,16 +173,21 @@ public AwsProperties(Map<String, String> properties) {
this.s3FileIoMultipartUploadThreads = PropertyUtil.propertyAsInt(properties, S3FILEIO_MULTIPART_UPLOAD_THREADS,
Runtime.getRuntime().availableProcessors());

this.s3FileIoMultiPartSize = PropertyUtil.propertyAsInt(properties, S3FILEIO_MULTIPART_SIZE,
DEFAULT_MULTIPART_SIZE);
try {
this.s3FileIoMultiPartSize = PropertyUtil.propertyAsInt(properties, S3FILEIO_MULTIPART_SIZE,
S3FILEIO_MULTIPART_SIZE_DEFAULT);
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Input malformed or exceeded maximum multipart upload size 5GB: %s" +
properties.get(S3FILEIO_MULTIPART_SIZE));
}

this.s3FileIoMultipartThresholdFactor = PropertyUtil.propertyAsDouble(properties,
S3FILEIO_MULTIPART_THRESHOLD_FACTOR, DEFAULT_MULTIPART_THRESHOLD);
S3FILEIO_MULTIPART_THRESHOLD_FACTOR, S3FILEIO_MULTIPART_THRESHOLD_FACTOR_DEFAULT);

Preconditions.checkArgument(s3FileIoMultipartThresholdFactor >= 1.0,
"Multipart threshold factor must be >= to 1.0");

Preconditions.checkArgument(s3FileIoMultiPartSize >= MIN_MULTIPART_UPLOAD_SIZE,
Preconditions.checkArgument(s3FileIoMultiPartSize >= S3FILEIO_MULTIPART_SIZE_MIN,
"Minimum multipart upload object size must be larger than 5 MB.");

this.s3fileIoStagingDirectory = PropertyUtil.propertyAsString(properties, S3FILEIO_STAGING_DIRECTORY,
Expand Down Expand Up @@ -228,18 +243,34 @@ public int s3FileIoMultipartUploadThreads() {
return s3FileIoMultipartUploadThreads;
}

public void setS3FileIoMultipartUploadThreads(int threads) {
this.s3FileIoMultipartUploadThreads = threads;
}

public int s3FileIoMultiPartSize() {
return s3FileIoMultiPartSize;
}

public void setS3FileIoMultiPartSize(int size) {
this.s3FileIoMultiPartSize = size;
}

public double s3FileIOMultipartThresholdFactor() {
return s3FileIoMultipartThresholdFactor;
}

public void setS3FileIoMultipartThresholdFactor(double factor) {
this.s3FileIoMultipartThresholdFactor = factor;
}

public String getS3fileIoStagingDirectory() {
return s3fileIoStagingDirectory;
}

public void setS3fileIoStagingDirectory(String directory) {
this.s3fileIoStagingDirectory = directory;
}

public ObjectCannedACL s3FileIoAcl() {
return this.s3FileIoAcl;
}
Expand Down
4 changes: 3 additions & 1 deletion aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@

/**
* FileIO implementation backed by S3.
* <p>
* Locations used must follow the conventions for S3 URIs (e.g. s3://bucket/path...).
* See {@link S3URI#VALID_SCHEMES} for the list of supported S3 URI schemes.
* URIs with schemes s3a, s3n, https are also treated as s3 file paths.
* Using this FileIO with other schemes will result in {@link org.apache.iceberg.exceptions.ValidationException}.
*/
public class S3FileIO implements FileIO {
private final SerializableSupplier<S3Client> s3;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ class S3OutputStream extends PositionOutputStream {
private long pos = 0;
private boolean closed = false;

@SuppressWarnings({"StaticAssignmentInConstructor", "StaticGuardedByInstance"})
S3OutputStream(S3Client s3, S3URI location, AwsProperties awsProperties) throws IOException {
if (executorService == null) {
synchronized (this) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import software.amazon.awssdk.services.s3.model.ServerSideEncryption;
import software.amazon.awssdk.services.s3.model.UploadPartRequest;

@SuppressWarnings("UnnecessaryLambda")
public class S3RequestUtil {

private static final Function<ServerSideEncryption, S3Request.Builder> NULL_SSE_SETTER = sse -> null;
Expand Down
Loading