diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java index 5ec346ba29af..4008b25792cc 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java @@ -38,6 +38,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; @@ -337,8 +338,10 @@ private void uploadParts() { } if (thrown != null) { + // Exception observed here will be thrown as part of + // CompletionException + // when we will join completable futures. LOG.error("Failed to upload part: {}", uploadRequest, thrown); - abortUpload(); } }); @@ -349,11 +352,19 @@ private void uploadParts() { private void completeMultiPartUpload() { Preconditions.checkState(closed, "Complete upload called on open stream: " + location); - List completedParts = - multiPartMap.values().stream() - .map(CompletableFuture::join) - .sorted(Comparator.comparing(CompletedPart::partNumber)) - .collect(Collectors.toList()); + List completedParts; + try { + completedParts = + multiPartMap.values().stream() + .map(CompletableFuture::join) + .sorted(Comparator.comparing(CompletedPart::partNumber)) + .collect(Collectors.toList()); + } catch (CompletionException ce) { + // cancel the remaining futures. + multiPartMap.values().forEach(c -> c.cancel(true)); + abortUpload(); + throw ce; + } CompleteMultipartUploadRequest request = CompleteMultipartUploadRequest.builder() diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3OutputStream.java b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3OutputStream.java index 893f4edd3cba..967db9aadc0d 100644 --- a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3OutputStream.java +++ b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3OutputStream.java @@ -24,7 +24,6 @@ import static org.junit.Assert.fail; import static org.mockito.AdditionalAnswers.delegatesTo; import static org.mockito.Mockito.any; -import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.reset; @@ -136,7 +135,7 @@ public void testAbortAfterFailedPartUpload() { .isInstanceOf(mockException.getClass()) .hasMessageContaining(mockException.getMessage()); - verify(s3mock, atLeastOnce()).abortMultipartUpload((AbortMultipartUploadRequest) any()); + verify(s3mock, times(1)).abortMultipartUpload((AbortMultipartUploadRequest) any()); } @Test @@ -156,7 +155,7 @@ public void testAbortMultipart() { .isInstanceOf(mockException.getClass()) .hasMessageContaining(mockException.getMessage()); - verify(s3mock).abortMultipartUpload((AbortMultipartUploadRequest) any()); + verify(s3mock, times(1)).abortMultipartUpload((AbortMultipartUploadRequest) any()); } @Test