diff --git a/aws/src/integration/java/org/apache/iceberg/aws/s3/S3MultipartUploadTest.java b/aws/src/integration/java/org/apache/iceberg/aws/s3/S3MultipartUploadTest.java new file mode 100644 index 000000000000..5f636aded59d --- /dev/null +++ b/aws/src/integration/java/org/apache/iceberg/aws/s3/S3MultipartUploadTest.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aws.s3; + +import java.io.IOException; +import java.util.Random; +import java.util.UUID; +import java.util.function.Supplier; +import java.util.stream.IntStream; +import org.apache.iceberg.aws.AwsClientUtil; +import org.apache.iceberg.aws.AwsIntegTestUtil; +import org.apache.iceberg.aws.AwsProperties; +import org.apache.iceberg.io.PositionOutputStream; +import org.apache.iceberg.io.SeekableInputStream; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import software.amazon.awssdk.services.s3.S3Client; + +/** + * Long-running tests to ensure multipart upload logic is resilient + */ +public class S3MultipartUploadTest { + + private final Random random = new Random(1); + private static S3Client s3; + private static String bucketName; + private static String prefix; + private static AwsProperties properties; + private static S3FileIO io; + private String objectUri; + + @BeforeClass + public static void beforeClass() { + s3 = AwsClientUtil.defaultS3Client(); + bucketName = AwsIntegTestUtil.testBucketName(); + prefix = UUID.randomUUID().toString(); + properties = new AwsProperties(); + properties.setS3FileIoMultiPartSize(AwsProperties.S3FILEIO_MULTIPART_SIZE_MIN); + io = new S3FileIO(() -> s3, properties); + } + + @AfterClass + public static void afterClass() { + AwsIntegTestUtil.cleanS3Bucket(s3, bucketName, prefix); + } + + @Before + public void before() { + String objectKey = String.format("%s/%s", prefix, UUID.randomUUID().toString()); + objectUri = String.format("s3://%s/%s", bucketName, objectKey); + } + + @Test + public void testManyParts_writeWithInt() throws IOException { + int parts = 200; + writeInts(objectUri, parts, random::nextInt); + Assert.assertEquals(parts * (long) AwsProperties.S3FILEIO_MULTIPART_SIZE_MIN, + io.newInputFile(objectUri).getLength()); + } + + @Test + public void testManyParts_writeWithBytes() throws IOException { + int parts = 200; + byte[] bytes = new byte[AwsProperties.S3FILEIO_MULTIPART_SIZE_MIN]; + writeBytes(objectUri, parts, () -> { + random.nextBytes(bytes); + return bytes; + }); + Assert.assertEquals(parts * (long) AwsProperties.S3FILEIO_MULTIPART_SIZE_MIN, + io.newInputFile(objectUri).getLength()); + } + + @Test + public void testContents_writeWithInt() throws IOException { + writeInts(objectUri, 10, () -> 6); + verifyInts(objectUri, () -> 6); + } + + @Test + public void testContents_writeWithBytes() throws IOException { + byte[] bytes = new byte[AwsProperties.S3FILEIO_MULTIPART_SIZE_MIN]; + for (int i = 0; i < AwsProperties.S3FILEIO_MULTIPART_SIZE_MIN; i++) { + bytes[i] = 6; + } + writeBytes(objectUri, 10, () -> bytes); + verifyInts(objectUri, () -> 6); + } + + @Test + public void testUploadRemainder() throws IOException { + long length = 3 * AwsProperties.S3FILEIO_MULTIPART_SIZE_MIN + 2 * 1024 * 1024; + writeInts(objectUri, 1, length, random::nextInt); + Assert.assertEquals(length, io.newInputFile(objectUri).getLength()); + } + + @Test + public void testParallelUpload() throws IOException { + int threads = 16; + IntStream.range(0, threads).parallel() + .forEach(d -> writeInts(objectUri + d, 3, () -> d)); + + for (int i = 0; i < threads; i++) { + final int d = i; + verifyInts(objectUri + d, () -> d); + } + } + + private void writeInts(String fileUri, int parts, Supplier writer) { + writeInts(fileUri, parts, AwsProperties.S3FILEIO_MULTIPART_SIZE_MIN, writer); + } + + private void writeInts(String fileUri, int parts, long partSize, Supplier writer) { + try (PositionOutputStream outputStream = io.newOutputFile(fileUri).create()) { + for (int i = 0; i < parts; i++) { + for (long j = 0; j < partSize; j++) { + outputStream.write(writer.get()); + } + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private void verifyInts(String fileUri, Supplier verifier) { + try (SeekableInputStream inputStream = io.newInputFile(fileUri).newStream()) { + int cur; + while ((cur = inputStream.read()) != -1) { + Assert.assertEquals(verifier.get().intValue(), cur); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private void writeBytes(String fileUri, int parts, Supplier writer) { + try (PositionOutputStream outputStream = io.newOutputFile(fileUri).create()) { + for (int i = 0; i < parts; i++) { + outputStream.write(writer.get()); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java b/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java index 4f460adcab3b..d7ac5a023de8 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java +++ b/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java @@ -28,6 +28,8 @@ public class AwsProperties { /** * Type of S3 Server side encryption used, default to {@link AwsProperties#S3FILEIO_SSE_TYPE_NONE}. + *

+ * For more details: https://docs.aws.amazon.com/AmazonS3/latest/dev/serv-side-encryption.html */ public static final String S3FILEIO_SSE_TYPE = "s3fileio.sse.type"; @@ -38,18 +40,21 @@ public class AwsProperties { /** * S3 SSE-KMS encryption. + *

* For more details: https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingKMSEncryption.html */ public static final String S3FILEIO_SSE_TYPE_KMS = "kms"; /** * S3 SSE-S3 encryption. + *

* For more details: https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingServerSideEncryption.html */ public static final String S3FILEIO_SSE_TYPE_S3 = "s3"; /** * S3 SSE-C encryption. + *

* For more details: https://docs.aws.amazon.com/AmazonS3/latest/dev/ServerSideEncryptionCustomerKeys.html */ public static final String S3FILEIO_SSE_TYPE_CUSTOM = "custom"; @@ -70,6 +75,7 @@ public class AwsProperties { /** * The ID of the Glue Data Catalog where the tables reside. * If none is provided, Glue automatically uses the caller's AWS account ID by default. + *

* For more details, see https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-catalog-databases.html */ public static final String GLUE_CATALOG_ID = "gluecatalog.id"; @@ -84,14 +90,21 @@ public class AwsProperties { public static final boolean GLUE_CATALOG_SKIP_ARCHIVE_DEFAULT = false; /** - * Number of threads to use for uploading parts to S3 (shared pool across all output streams). + * Number of threads to use for uploading parts to S3 (shared pool across all output streams), + * default to {@link Runtime#availableProcessors()} */ public static final String S3FILEIO_MULTIPART_UPLOAD_THREADS = "s3fileio.multipart.num-threads"; /** - * The size of a single part for multipart upload requests (default: 32MB). + * The size of a single part for multipart upload requests in bytes (default: 32MB). + * based on S3 requirement, the part size must be at least 5MB. + * Too ensure performance of the reader and writer, the part size must be less than 2GB. + *

+ * For more details, see https://docs.aws.amazon.com/AmazonS3/latest/dev/qfacts.html */ public static final String S3FILEIO_MULTIPART_SIZE = "s3fileio.multipart.part.size"; + public static final int S3FILEIO_MULTIPART_SIZE_DEFAULT = 32 * 1024 * 1024; + public static final int S3FILEIO_MULTIPART_SIZE_MIN = 5 * 1024 * 1024; /** * The threshold expressed as a factor times the multipart size at which to @@ -99,26 +112,23 @@ public class AwsProperties { * (default: 1.5). */ public static final String S3FILEIO_MULTIPART_THRESHOLD_FACTOR = "s3fileio.multipart.threshold"; + public static final double S3FILEIO_MULTIPART_THRESHOLD_FACTOR_DEFAULT = 1.5; /** - * Location to put staging files for upload to S3. + * Location to put staging files for upload to S3, default to temp directory set in java.io.tmpdir. */ public static final String S3FILEIO_STAGING_DIRECTORY = "s3fileio.staging.dir"; /** - * Used to set canned access control list for S3 client to use during write. - * For more details: https://docs.aws.amazon.com/AmazonS3/latest/dev/acl-overview.html + * Used to configure canned access control list (ACL) for S3 client to use during write. + * If not set, ACL will not be set for requests. + *

* The input must be one of {@link software.amazon.awssdk.services.s3.model.ObjectCannedACL}, * such as 'public-read-write' - * If not set, ACL will not be set for requests. + * For more details: https://docs.aws.amazon.com/AmazonS3/latest/dev/acl-overview.html */ public static final String S3FILEIO_ACL = "s3fileio.acl"; - - static final int MIN_MULTIPART_UPLOAD_SIZE = 5 * 1024 * 1024; - static final int DEFAULT_MULTIPART_SIZE = 32 * 1024 * 1024; - static final double DEFAULT_MULTIPART_THRESHOLD = 1.5; - private String s3FileIoSseType; private String s3FileIoSseKey; private String s3FileIoSseMd5; @@ -138,8 +148,8 @@ public AwsProperties() { this.s3FileIoAcl = null; this.s3FileIoMultipartUploadThreads = Runtime.getRuntime().availableProcessors(); - this.s3FileIoMultiPartSize = DEFAULT_MULTIPART_SIZE; - this.s3FileIoMultipartThresholdFactor = DEFAULT_MULTIPART_THRESHOLD; + this.s3FileIoMultiPartSize = S3FILEIO_MULTIPART_SIZE_DEFAULT; + this.s3FileIoMultipartThresholdFactor = S3FILEIO_MULTIPART_THRESHOLD_FACTOR_DEFAULT; this.s3fileIoStagingDirectory = System.getProperty("java.io.tmpdir"); this.glueCatalogId = null; @@ -163,16 +173,21 @@ public AwsProperties(Map properties) { this.s3FileIoMultipartUploadThreads = PropertyUtil.propertyAsInt(properties, S3FILEIO_MULTIPART_UPLOAD_THREADS, Runtime.getRuntime().availableProcessors()); - this.s3FileIoMultiPartSize = PropertyUtil.propertyAsInt(properties, S3FILEIO_MULTIPART_SIZE, - DEFAULT_MULTIPART_SIZE); + try { + this.s3FileIoMultiPartSize = PropertyUtil.propertyAsInt(properties, S3FILEIO_MULTIPART_SIZE, + S3FILEIO_MULTIPART_SIZE_DEFAULT); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Input malformed or exceeded maximum multipart upload size 5GB: %s" + + properties.get(S3FILEIO_MULTIPART_SIZE)); + } this.s3FileIoMultipartThresholdFactor = PropertyUtil.propertyAsDouble(properties, - S3FILEIO_MULTIPART_THRESHOLD_FACTOR, DEFAULT_MULTIPART_THRESHOLD); + S3FILEIO_MULTIPART_THRESHOLD_FACTOR, S3FILEIO_MULTIPART_THRESHOLD_FACTOR_DEFAULT); Preconditions.checkArgument(s3FileIoMultipartThresholdFactor >= 1.0, "Multipart threshold factor must be >= to 1.0"); - Preconditions.checkArgument(s3FileIoMultiPartSize >= MIN_MULTIPART_UPLOAD_SIZE, + Preconditions.checkArgument(s3FileIoMultiPartSize >= S3FILEIO_MULTIPART_SIZE_MIN, "Minimum multipart upload object size must be larger than 5 MB."); this.s3fileIoStagingDirectory = PropertyUtil.propertyAsString(properties, S3FILEIO_STAGING_DIRECTORY, @@ -228,18 +243,34 @@ public int s3FileIoMultipartUploadThreads() { return s3FileIoMultipartUploadThreads; } + public void setS3FileIoMultipartUploadThreads(int threads) { + this.s3FileIoMultipartUploadThreads = threads; + } + public int s3FileIoMultiPartSize() { return s3FileIoMultiPartSize; } + public void setS3FileIoMultiPartSize(int size) { + this.s3FileIoMultiPartSize = size; + } + public double s3FileIOMultipartThresholdFactor() { return s3FileIoMultipartThresholdFactor; } + public void setS3FileIoMultipartThresholdFactor(double factor) { + this.s3FileIoMultipartThresholdFactor = factor; + } + public String getS3fileIoStagingDirectory() { return s3fileIoStagingDirectory; } + public void setS3fileIoStagingDirectory(String directory) { + this.s3fileIoStagingDirectory = directory; + } + public ObjectCannedACL s3FileIoAcl() { return this.s3FileIoAcl; } diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java index edcd87ae2fe7..469f93532b82 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java @@ -33,8 +33,10 @@ /** * FileIO implementation backed by S3. + *

* Locations used must follow the conventions for S3 URIs (e.g. s3://bucket/path...). - * See {@link S3URI#VALID_SCHEMES} for the list of supported S3 URI schemes. + * URIs with schemes s3a, s3n, https are also treated as s3 file paths. + * Using this FileIO with other schemes will result in {@link org.apache.iceberg.exceptions.ValidationException}. */ public class S3FileIO implements FileIO { private final SerializableSupplier s3; diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java index ed4959fa5df0..3cfb007e242d 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java @@ -85,6 +85,7 @@ class S3OutputStream extends PositionOutputStream { private long pos = 0; private boolean closed = false; + @SuppressWarnings({"StaticAssignmentInConstructor", "StaticGuardedByInstance"}) S3OutputStream(S3Client s3, S3URI location, AwsProperties awsProperties) throws IOException { if (executorService == null) { synchronized (this) { diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3RequestUtil.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3RequestUtil.java index 83c2dfd6009f..c44ce2913c7d 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3RequestUtil.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3RequestUtil.java @@ -30,6 +30,7 @@ import software.amazon.awssdk.services.s3.model.ServerSideEncryption; import software.amazon.awssdk.services.s3.model.UploadPartRequest; +@SuppressWarnings("UnnecessaryLambda") public class S3RequestUtil { private static final Function NULL_SSE_SETTER = sse -> null; diff --git a/aws/src/test/java/org/apache/iceberg/aws/AwsPropertiesTest.java b/aws/src/test/java/org/apache/iceberg/aws/AwsPropertiesTest.java index 069da2635f93..18fb1c4f4641 100644 --- a/aws/src/test/java/org/apache/iceberg/aws/AwsPropertiesTest.java +++ b/aws/src/test/java/org/apache/iceberg/aws/AwsPropertiesTest.java @@ -67,4 +67,34 @@ public void testS3FileIoAcl_unknownType() { () -> new AwsProperties(map)); } + @Test + public void testS3MultipartSizeTooSmall() { + Map map = Maps.newHashMap(); + map.put(AwsProperties.S3FILEIO_MULTIPART_SIZE, "1"); + AssertHelpers.assertThrows("should not accept small part size", + IllegalArgumentException.class, + "Minimum multipart upload object size must be larger than 5 MB", + () -> new AwsProperties(map)); + } + + @Test + public void testS3MultipartSizeTooLarge() { + Map map = Maps.newHashMap(); + map.put(AwsProperties.S3FILEIO_MULTIPART_SIZE, "5368709120"); // 5GB + AssertHelpers.assertThrows("should not accept too big part size", + IllegalArgumentException.class, + "Input malformed or exceeded maximum multipart upload size 5GB", + () -> new AwsProperties(map)); + } + + @Test + public void testS3MultipartThresholdFactorLessThanOne() { + Map map = Maps.newHashMap(); + map.put(AwsProperties.S3FILEIO_MULTIPART_THRESHOLD_FACTOR, "0.9"); + AssertHelpers.assertThrows("should not accept factor less than 1", + IllegalArgumentException.class, + "Multipart threshold factor must be >= to 1.0", + () -> new AwsProperties(map)); + } + }