diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Abortable.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Abortable.java new file mode 100644 index 0000000000000..fcf274d66ea09 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Abortable.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs; + +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Stream that abort the upload. + */ +@InterfaceStability.Unstable +public interface Abortable { + + /** + * Abort the upload for the stream. + * + * This is to provide ability to cancel the write on stream; once stream is + * aborted, it should behave as the write was never happened. + */ + void abort(); +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStream.java index 27d164b7d87ba..449ddb2153c28 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStream.java @@ -34,7 +34,7 @@ @InterfaceStability.Stable public class FSDataOutputStream extends DataOutputStream implements Syncable, CanSetDropBehind, StreamCapabilities, - IOStatisticsSource { + IOStatisticsSource, Abortable { private final OutputStream wrappedStream; private static class PositionCache extends FilterOutputStream { @@ -170,4 +170,14 @@ public void setDropBehind(Boolean dropBehind) throws IOException { public IOStatistics getIOStatistics() { return IOStatisticsSupport.retrieveIOStatistics(wrappedStream); } + + @Override + public void abort() { + try { + ((Abortable)wrappedStream).abort(); + } catch (ClassCastException e) { + throw new UnsupportedOperationException("the wrapped stream does " + + "not support aborting stream."); + } + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java index 15ea2ab325c33..c484f6d5b0be7 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java @@ -76,6 +76,11 @@ public interface StreamCapabilities { */ String IOSTATISTICS = "iostatistics"; + /** + * Stream abort() capability implemented by {@link Abortable#abort()}. + */ + String ABORTABLE = "abortable"; + /** * Capabilities that a stream can support and be queried for. */ diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java index 5784ab8615e6e..024d1ca5674f6 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java @@ -37,6 +37,7 @@ import com.amazonaws.services.s3.model.PutObjectRequest; import com.amazonaws.services.s3.model.PutObjectResult; import com.amazonaws.services.s3.model.UploadPartRequest; +import org.apache.hadoop.fs.Abortable; import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture; @@ -74,7 +75,7 @@ @InterfaceAudience.Private @InterfaceStability.Unstable class S3ABlockOutputStream extends OutputStream implements - StreamCapabilities, IOStatisticsSource { + StreamCapabilities, IOStatisticsSource, Abortable { private static final Logger LOG = LoggerFactory.getLogger(S3ABlockOutputStream.class); @@ -541,6 +542,10 @@ public boolean hasCapability(String capability) { case StreamCapabilities.IOSTATISTICS: return true; + // S3A supports abort. + case StreamCapabilities.ABORTABLE: + return true; + default: return false; } @@ -551,6 +556,24 @@ public IOStatistics getIOStatistics() { return iostatistics; } + @Override + public void abort() { + if (closed.getAndSet(true)) { + // already closed + LOG.debug("Ignoring abort() as stream is already closed"); + return; + } + + S3ADataBlocks.DataBlock block = getActiveBlock(); + try { + if (multiPartUpload != null) { + multiPartUpload.abort(); + } + } finally { + cleanupWithLogger(LOG, block, blockFactory); + } + } + /** * Multiple partition upload. */ diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputArray.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputArray.java index 88e0cef2a34aa..21ec3c4b8e3e8 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputArray.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputArray.java @@ -20,6 +20,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics; @@ -155,4 +156,22 @@ public void testMarkReset() throws Throwable { markAndResetDatablock(createFactory(getFileSystem())); } + @Test + public void testAbortAfterWrite() throws Throwable { + Path dest = path("testAbortAfterWrite"); + describe(" testAbortAfterWrite"); + FileSystem fs = getFileSystem(); + FSDataOutputStream stream = fs.create(dest, true); + byte[] data = ContractTestUtils.dataset(16, 'a', 26); + try { + stream.write(data); + stream.abort(); + // the path should not exist + ContractTestUtils.assertPathsDoNotExist(fs, "aborted file", dest); + } finally { + IOUtils.closeStream(stream); + // check the path doesn't exist "after" closing stream + ContractTestUtils.assertPathsDoNotExist(fs, "aborted file", dest); + } + } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ABlockOutputStream.java index 284718bd75c12..baa4a542c855a 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ABlockOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ABlockOutputStream.java @@ -82,4 +82,30 @@ public void testWriteOperationHelperPartLimits() throws Throwable { () -> woh.newUploadPartRequest(key, "uploadId", 50000, 1024, inputStream, null, 0L)); } + + static class StreamClosedException extends IOException {} + + @Test + public void testStreamClosedAfterAbort() throws Exception { + stream.abort(); + + // This verification replaces testing various operations after calling + // abort: after calling abort, stream is closed like calling close(). + intercept(IOException.class, () -> stream.checkOpen()); + + // check that calling write() will call checkOpen() and throws exception + doThrow(new StreamClosedException()).when(stream).checkOpen(); + + intercept(StreamClosedException.class, + () -> stream.write(new byte[] {'a', 'b', 'c'})); + } + + @Test + public void testCallingCloseAfterCallingAbort() throws Exception { + stream.abort(); + + // This shouldn't throw IOException like calling close() multiple times. + // This will ensure abort() can be called with try-with-resource. + stream.close(); + } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AMultipartUploadSizeLimits.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AMultipartUploadSizeLimits.java index 4a348be8db5fa..8878ea1a1f017 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AMultipartUploadSizeLimits.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AMultipartUploadSizeLimits.java @@ -20,6 +20,9 @@ import java.io.File; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.IOUtils; import org.assertj.core.api.Assertions; import org.junit.Test; @@ -118,4 +121,29 @@ public void testCommitLimitFailure() throws Throwable { describedAs("commit abort count") .isEqualTo(initial + 1); } + + @Test + public void testAbortAfterTwoPartUpload() throws Throwable { + Path file = path(getMethodName()); + + byte[] data = dataset(6 * _1MB, 'a', 'z' - 'a'); + + FileSystem fs = getFileSystem(); + FSDataOutputStream stream = fs.create(file, true); + try { + stream.write(data); + + // From testTwoPartUpload() we know closing stream will finalize uploads + // and materialize the path. Here we call abort() to abort the upload, + // and ensure the path is NOT available. (uploads are aborted) + + stream.abort(); + // the path should not exist + assertPathDoesNotExist("upload must not have completed", file); + } finally { + IOUtils.closeStream(stream); + // check the path doesn't exist "after" closing stream + assertPathDoesNotExist("upload must not have completed", file); + } + } }