Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs;

import org.apache.hadoop.classification.InterfaceStability;

/**
* Stream that abort the upload.
*/
@InterfaceStability.Unstable
public interface Abortable {

/**
* Abort the upload for the stream.
*
* This is to provide ability to cancel the write on stream; once stream is
* aborted, it should behave as the write was never happened.
*/
void abort();
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
@InterfaceStability.Stable
public class FSDataOutputStream extends DataOutputStream
implements Syncable, CanSetDropBehind, StreamCapabilities,
IOStatisticsSource {
IOStatisticsSource, Abortable {
private final OutputStream wrappedStream;

private static class PositionCache extends FilterOutputStream {
Expand Down Expand Up @@ -170,4 +170,14 @@ public void setDropBehind(Boolean dropBehind) throws IOException {
public IOStatistics getIOStatistics() {
return IOStatisticsSupport.retrieveIOStatistics(wrappedStream);
}

@Override
public void abort() {
try {
((Abortable)wrappedStream).abort();
} catch (ClassCastException e) {
throw new UnsupportedOperationException("the wrapped stream does " +
"not support aborting stream.");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ public interface StreamCapabilities {
*/
String IOSTATISTICS = "iostatistics";

/**
* Stream abort() capability implemented by {@link Abortable#abort()}.
*/
String ABORTABLE = "abortable";

/**
* Capabilities that a stream can support and be queried for.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.PutObjectResult;
import com.amazonaws.services.s3.model.UploadPartRequest;
import org.apache.hadoop.fs.Abortable;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reposition in org.apache.* block below.

import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture;
Expand Down Expand Up @@ -74,7 +75,7 @@
@InterfaceAudience.Private
@InterfaceStability.Unstable
class S3ABlockOutputStream extends OutputStream implements
StreamCapabilities, IOStatisticsSource {
StreamCapabilities, IOStatisticsSource, Abortable {

private static final Logger LOG =
LoggerFactory.getLogger(S3ABlockOutputStream.class);
Expand Down Expand Up @@ -541,6 +542,10 @@ public boolean hasCapability(String capability) {
case StreamCapabilities.IOSTATISTICS:
return true;

// S3A supports abort.
case StreamCapabilities.ABORTABLE:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can merge both case statements

return true;

default:
return false;
}
Expand All @@ -551,6 +556,24 @@ public IOStatistics getIOStatistics() {
return iostatistics;
}

@Override
public void abort() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Migrating comment in HeartSaVioR@63c5588#r46507150

the new "cloud ready" API calls always return a CompletableFuture, to emphasise that the op may take time and to allow the caller to do something while waiting. Would we want to do this here? I'm not convinced it is appropriate. Instead we say

  1. call must guarantee that after this is invoked,. close() will not materialize the file at its final path
  2. it may communicate with the store to cancel an operation; which may retry. Errors will be stored.
  3. there may still/also be async IO to the store after the call returns, but this must maintain the requirement "not visible"
  4. And close() may do some IO to cancel

if (closed.getAndSet(true)) {
// already closed
LOG.debug("Ignoring abort() as stream is already closed");
return;
}

S3ADataBlocks.DataBlock block = getActiveBlock();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Migrating comment in HeartSaVioR@63c5588#r46506593

I think we are going to have to worry about this a bit more, because we may have queued >1 block for upload in a separate thread. They'll maybe need interruption, or at least, when they finish, see if they should immediately cancel the upload. This won't make any difference in the semantics of abort() (the final upload has been killed), I just don't want to run up any bills.

try {
if (multiPartUpload != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering what happens in case of non multipart upload

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it,we are closing the stream before only.

multiPartUpload.abort();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Migrating comment in HeartSaVioR@63c5588#r46506765

ok, don't worry so much about my prev comment. That cancels all the outstanding futures.

}
} finally {
cleanupWithLogger(LOG, block, blockFactory);
}
}

/**
* Multiple partition upload.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics;
Expand Down Expand Up @@ -155,4 +156,22 @@ public void testMarkReset() throws Throwable {
markAndResetDatablock(createFactory(getFileSystem()));
}

@Test
public void testAbortAfterWrite() throws Throwable {
Path dest = path("testAbortAfterWrite");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use getMethodName()

describe(" testAbortAfterWrite");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: A bit more explanatory.

FileSystem fs = getFileSystem();
FSDataOutputStream stream = fs.create(dest, true);
byte[] data = ContractTestUtils.dataset(16, 'a', 26);
try {
stream.write(data);
stream.abort();
// the path should not exist
ContractTestUtils.assertPathsDoNotExist(fs, "aborted file", dest);
} finally {
IOUtils.closeStream(stream);
// check the path doesn't exist "after" closing stream
ContractTestUtils.assertPathsDoNotExist(fs, "aborted file", dest);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,30 @@ public void testWriteOperationHelperPartLimits() throws Throwable {
() -> woh.newUploadPartRequest(key,
"uploadId", 50000, 1024, inputStream, null, 0L));
}

static class StreamClosedException extends IOException {}

@Test
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Migrating comment in HeartSaVioR@63c5588#r46506646

tests are good. We will need to do an ITest too, which can be done in ITestS3ABlockOutputArray

public void testStreamClosedAfterAbort() throws Exception {
stream.abort();

// This verification replaces testing various operations after calling
// abort: after calling abort, stream is closed like calling close().
intercept(IOException.class, () -> stream.checkOpen());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Migrating comment in HeartSaVioR@63c5588#r46507220

should also verify that stream.write() raises an IOE. We could raise a subclass of IOE to indicate this was a checkOpen failure for a stricter test


// check that calling write() will call checkOpen() and throws exception
doThrow(new StreamClosedException()).when(stream).checkOpen();

intercept(StreamClosedException.class,
() -> stream.write(new byte[] {'a', 'b', 'c'}));
}

@Test
public void testCallingCloseAfterCallingAbort() throws Exception {
stream.abort();

// This shouldn't throw IOException like calling close() multiple times.
// This will ensure abort() can be called with try-with-resource.
stream.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@

import java.io.File;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.IOUtils;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reposition in org.apache.* block below.

import org.assertj.core.api.Assertions;
import org.junit.Test;

Expand Down Expand Up @@ -118,4 +121,29 @@ public void testCommitLimitFailure() throws Throwable {
describedAs("commit abort count")
.isEqualTo(initial + 1);
}

@Test
public void testAbortAfterTwoPartUpload() throws Throwable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

javadoc or describe() to explain the test.

Path file = path(getMethodName());

byte[] data = dataset(6 * _1MB, 'a', 'z' - 'a');

FileSystem fs = getFileSystem();
FSDataOutputStream stream = fs.create(file, true);
try {
stream.write(data);

// From testTwoPartUpload() we know closing stream will finalize uploads
// and materialize the path. Here we call abort() to abort the upload,
// and ensure the path is NOT available. (uploads are aborted)

stream.abort();
// the path should not exist
assertPathDoesNotExist("upload must not have completed", file);
} finally {
IOUtils.closeStream(stream);
// check the path doesn't exist "after" closing stream
assertPathDoesNotExist("upload must not have completed", file);
}
}
}