diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java index 4008b25792cc..7b1d547ce7e5 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java @@ -105,6 +105,7 @@ class S3OutputStream extends PositionOutputStream { private long pos = 0; private boolean closed = false; + private Throwable closeFailureException; @SuppressWarnings("StaticAssignmentInConstructor") S3OutputStream(S3Client s3, S3URI location, AwsProperties awsProperties, MetricsContext metrics) @@ -258,6 +259,15 @@ private void newStream() throws IOException { @Override public void close() throws IOException { + + // A failed s3 close removes state that is required for a successful close. + // Any future close on this stream should fail. + if (closeFailureException != null) { + throw new IOException( + "Attempted to close an S3 output stream that failed to close earlier", + closeFailureException); + } + if (closed) { return; } @@ -267,8 +277,10 @@ public void close() throws IOException { try { stream.close(); - completeUploads(); + } catch (Exception e) { + closeFailureException = e; + throw e; } finally { cleanUpStagingFiles(); } diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3OutputStream.java b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3OutputStream.java index 967db9aadc0d..41bc9425eb11 100644 --- a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3OutputStream.java +++ b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3OutputStream.java @@ -54,6 +54,7 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -181,6 +182,24 @@ public void testWriteWithChecksumEnabled() { writeTest(); } + @Test + public void testCloseFailureShouldPersistOnFutureClose() throws IOException { + IllegalStateException mockException = + new IllegalStateException("mock failure to completeUploads on close"); + Mockito.doThrow(mockException) + .when(s3mock) + .putObject(any(PutObjectRequest.class), any(RequestBody.class)); + S3OutputStream stream = new S3OutputStream(s3mock, randomURI(), properties, nullMetrics()); + + Assertions.assertThatThrownBy(stream::close) + .isInstanceOf(mockException.getClass()) + .hasMessageContaining(mockException.getMessage()); + + Assertions.assertThatThrownBy(stream::close) + .isInstanceOf(IOException.class) + .hasCause(mockException); + } + private void writeTest() { // Run tests for both byte and array write paths Stream.of(true, false)