Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1925,20 +1925,13 @@
</property>

<property>
<name>fs.s3a.committer.staging.abort.pending.uploads</name>
<name>fs.s3a.committer.abort.pending.uploads</name>
<value>true</value>
<description>
Should the staging committers abort all pending uploads to the destination
Should the committers abort all pending uploads to the destination
directory?

Changing this if more than one partitioned committer is
writing to the same destination tree simultaneously; otherwise
the first job to complete will cancel all outstanding uploads from the
others. However, it may lead to leaked outstanding uploads from failed
tasks. If disabled, configure the bucket lifecycle to remove uploads
after a time period, and/or set up a workflow to explicitly delete
entries. Otherwise there is a risk that uncommitted uploads may run up
bills.
Set to false if more than one job is writing to the same directory tree.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Committers don't cancel just their own pending uploads?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

taskAbort, yet. JobAbort/cleanup is where things are more trouble, because the job doesn't know what specific task attempts have uploaded.

with the staging committer, there's no files uploaded until task commit. Tasks which fail before that moment don't have any pending uploads to cancel.
with the magic committer, because the files are written direct to S3, there is more risk of pending uploads collecting.

I'm not sure about spark here, but on MR when a task is considered to have failed, abortTask is called in the AM to abort that specific task; for the magic committer the task's set of .pending files is determined by listing the task attempt dir, and those operations cancelled. If that operation is called reliably, only the current upload is pending.

Of course, if an entire job fails: no cleanup at all.

The best thing to do is simply to tell everyone to have a scheduled cleanup.

FWIW, the most leakage I see in the real world is actually from incomplete S3ABlockOutputStream writes as again, they accrue bills. Everyone needs a lifecycle rule to delete old ones. The sole exception there is one which our QE team used which (unknown to them) I'd use for testing the scalability of the "hadoop s3guard uploads" command -how well does it work when there are many, many incomplete uploads, can it still delete them all etc. If they had a rule then it'd screw up my test runs.

</description>
</property>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@
import static org.apache.hadoop.fs.s3a.auth.RolePolicies.allowS3Operations;
import static org.apache.hadoop.fs.s3a.auth.delegation.S3ADelegationTokens.TokenIssuingPolicy.NoTokensAvailable;
import static org.apache.hadoop.fs.s3a.auth.delegation.S3ADelegationTokens.hasDelegationTokenBinding;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_ABORT_PENDING_UPLOADS;
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit;
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletionIgnoringExceptions;
import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isUnknownBucket;
Expand Down Expand Up @@ -314,9 +316,10 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
/** Add any deprecated keys. */
@SuppressWarnings("deprecation")
private static void addDeprecatedKeys() {
// this is retained as a placeholder for when new deprecated keys
// need to be added.
Configuration.DeprecationDelta[] deltas = {
new Configuration.DeprecationDelta(
FS_S3A_COMMITTER_STAGING_ABORT_PENDING_UPLOADS,
FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS)
};

if (deltas.length > 0) {
Expand Down Expand Up @@ -4581,7 +4584,7 @@ public List<MultipartUpload> listMultipartUploads(String prefix)
*/
@Retries.OnceRaw
void abortMultipartUpload(String destKey, String uploadId) {
LOG.debug("Aborting multipart upload {} to {}", uploadId, destKey);
LOG.info("Aborting multipart upload {} to {}", uploadId, destKey);
getAmazonS3Client().abortMultipartUpload(
new AbortMultipartUploadRequest(getBucket(),
destKey,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ protected WriteOperationHelper(S3AFileSystem owner, Configuration conf) {
*/
void operationRetried(String text, Exception ex, int retries,
boolean idempotent) {
LOG.info("{}: Retried {}: {}", text, retries, ex.toString());
LOG.debug("Stack", ex);
owner.operationRetried(text, ex, retries, idempotent);
}

Expand Down Expand Up @@ -323,7 +325,9 @@ public CompleteMultipartUploadResult completeMPUwithRetries(
public void abortMultipartUpload(String destKey, String uploadId,
Retried retrying)
throws IOException {
invoker.retry("Aborting multipart upload", destKey, true,
invoker.retry("Aborting multipart upload ID " + uploadId,
destKey,
true,
retrying,
() -> owner.abortMultipartUpload(
destKey,
Expand Down Expand Up @@ -585,7 +589,8 @@ public BulkOperationState initiateOperation(final Path path,
@Retries.RetryTranslated
public UploadPartResult uploadPart(UploadPartRequest request)
throws IOException {
return retry("upload part",
return retry("upload part #" + request.getPartNumber()
+ " upload ID "+ request.getUploadId(),
request.getKey(),
true,
() -> owner.uploadPart(request));
Expand Down
Loading