Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.commit.files.Pendingset;
import org.apache.hadoop.fs.s3a.commit.files.PendingSet;
import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
import org.apache.hadoop.fs.s3a.commit.files.SuccessData;
import org.apache.hadoop.mapreduce.JobContext;
Expand Down Expand Up @@ -490,7 +490,7 @@ protected List<SinglePendingCommit> loadMultiplePendingCommitFiles(
.run(new Tasks.Task<FileStatus, IOException>() {
@Override
public void run(FileStatus pendingCommitFile) throws IOException {
Pendingset commits = Pendingset.load(
PendingSet commits = PendingSet.load(
fs, pendingCommitFile.getPath());
pending.addAll(commits.getCommits());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.LocatedFileStatus;
Expand All @@ -43,7 +44,7 @@
import org.apache.hadoop.fs.s3a.S3ALambda;
import org.apache.hadoop.fs.s3a.S3ARetryPolicy;
import org.apache.hadoop.fs.s3a.WriteOperationHelper;
import org.apache.hadoop.fs.s3a.commit.files.Pendingset;
import org.apache.hadoop.fs.s3a.commit.files.PendingSet;
import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
import org.apache.hadoop.fs.s3a.commit.files.SuccessData;

Expand Down Expand Up @@ -199,22 +200,22 @@ public List<LocatedFileStatus> locateAllSinglePendingCommits(
* not load/validate.
* @throws IOException on a failure to list the files.
*/
public Pair<Pendingset,
public Pair<PendingSet,
List<Pair<LocatedFileStatus, IOException>>>
loadSinglePendingCommits(Path pendingDir, boolean recursive)
throws IOException {

List<LocatedFileStatus> statusList = locateAllSinglePendingCommits(
pendingDir, recursive);
Pendingset commits = new Pendingset(
PendingSet commits = new PendingSet(
statusList.size());
List<Pair<LocatedFileStatus, IOException>> failures = new ArrayList<>(1);
for (LocatedFileStatus status : statusList) {
try {
commits.add(SinglePendingCommit.load(fs, status.getPath()));
} catch (IOException e) {
LOG.warn("Failed to load commit file {}", status.getPath(), e);
failures.add(new Pair<>(status, e));
failures.add(Pair.of(status, e));
}
}
return Pair.of(commits, failures);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

Expand Down Expand Up @@ -77,27 +78,13 @@ private CommitUtils() {
* @throws IllegalArgumentException if the path is invalid -relative, empty...
*/
public static List<String> splitPathToElements(Path path) {
String uriPath = path.toUri().getPath();
checkArgument(!uriPath.isEmpty(), "empty path");
checkArgument(uriPath.charAt(0) == '/', "path is relative");
if ("/".equals(uriPath)) {
// special case: empty list
return new ArrayList<>(0);
}
List<String> elements = new ArrayList<>();
int len = uriPath.length();
int firstElementChar = 1;
int endOfElement = uriPath.indexOf('/', firstElementChar);
while (endOfElement > 0) {
elements.add(uriPath.substring(firstElementChar, endOfElement));
firstElementChar = endOfElement + 1;
endOfElement = firstElementChar == len ? -1
: uriPath.indexOf('/', firstElementChar);
}
// expect a possible child element here
if (firstElementChar != len) {
elements.add(uriPath.substring(firstElementChar));
checkArgument(!path.equals(""), "empty path");
checkArgument(path.isAbsolute(), "path is relative");
if ("/".equals(path.toString())) {
return Collections.emptyList();
}
List<String> elements = Arrays.asList(path.toString()
.substring(1).split("/"));
return elements;
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.fs.s3a.commit;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
Expand All @@ -28,7 +29,6 @@
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;

import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -152,8 +152,8 @@ public <E extends Exception> boolean run(Task<I, E> task) throws E {

private <E extends Exception> boolean runSingleThreaded(Task<I, E> task)
throws E {
List<I> succeeded = Lists.newArrayList();
List<Exception> exceptions = Lists.newArrayList();
List<I> succeeded = new ArrayList();
List<Exception> exceptions = new ArrayList();

Iterator<I> iterator = items.iterator();
boolean threw = true;
Expand Down Expand Up @@ -238,7 +238,7 @@ private <E extends Exception> boolean runParallel(final Task<I, E> task)
final AtomicBoolean abortFailed = new AtomicBoolean(false);
final AtomicBoolean revertFailed = new AtomicBoolean(false);

List<Future<?>> futures = Lists.newArrayList();
List<Future<?>> futures = new ArrayList();

for (final I item : items) {
// submit a task for each item that will either run or abort the task
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class Pendingset extends PersistentCommitData {
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ignoring the fact that case-changing classnames is something very unlikely to take reliably in macos unless you can delete the old one first, I'm unsure about this. Why. It's not really a "set" in terms of java lang collection. At the same time, I'm not that fussy about classnames, though I do want it to stay all lower in the filesystem.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well it's two words. Pending and Set. So it's camel cased. Maybe PendingBatch?

The issue about case changing and macos doesn't make sense. The patch wasn't merged yet so no one will have issues with the name change.

private static final Logger LOG = LoggerFactory.getLogger(Pendingset.class);
public class PendingSet extends PersistentCommitData {
private static final Logger LOG = LoggerFactory.getLogger(PendingSet.class);

/**
* Supported version value: {@value}.
Expand All @@ -74,23 +74,23 @@ public class Pendingset extends PersistentCommitData {
/**
* Any custom extra data committer subclasses may choose to add.
*/
private Map<String, String> extraData = new HashMap<>(0);
private final Map<String, String> extraData = new HashMap<>(0);

public Pendingset() {
public PendingSet() {
this(0);
}


public Pendingset(int size) {
public PendingSet(int size) {
commits = new ArrayList<>(size);
}

/**
* Get a JSON serializer for this class.
* @return a serializer.
*/
public static JsonSerialization<Pendingset> serializer() {
return new JsonSerialization<>(Pendingset.class, false, true);
public static JsonSerialization<PendingSet> serializer() {
return new JsonSerialization<>(PendingSet.class, false, true);
}

/**
Expand All @@ -101,10 +101,10 @@ public static JsonSerialization<Pendingset> serializer() {
* @throws IOException IO failure
* @throws ValidationFailure if the data is invalid
*/
public static Pendingset load(FileSystem fs, Path path)
public static PendingSet load(FileSystem fs, Path path)
throws IOException {
LOG.debug("Reading pending commits in file {}", path);
Pendingset instance = serializer().load(fs, path);
PendingSet instance = serializer().load(fs, path);
instance.validate();
return instance;
}
Expand Down Expand Up @@ -190,15 +190,4 @@ public List<SinglePendingCommit> getCommits() {
public void setCommits(List<SinglePendingCommit> commits) {
this.commits = commits;
}

/**
* @return any custom extra data committer subclasses may choose to add.
*/
public Map<String, String> getExtraData() {
return extraData;
}

public void setExtraData(Map<String, String> extraData) {
this.extraData = extraData;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileStatus;
Expand All @@ -37,9 +38,8 @@
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
import org.apache.hadoop.fs.s3a.commit.CommitUtils;
import org.apache.hadoop.fs.s3a.commit.DurationInfo;
import org.apache.hadoop.fs.s3a.commit.Pair;
import org.apache.hadoop.fs.s3a.commit.PathCommitException;
import org.apache.hadoop.fs.s3a.commit.files.Pendingset;
import org.apache.hadoop.fs.s3a.commit.files.PendingSet;
import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
Expand Down Expand Up @@ -261,7 +261,7 @@ private boolean needsTaskCommit(TaskAttemptContext context,
public void commitTask(TaskAttemptContext context) throws IOException {
try (DurationInfo d = new DurationInfo("Commit task %s",
context.getTaskAttemptID())) {
Pendingset commits = innerCommitTask(context);
PendingSet commits = innerCommitTask(context);
LOG.info("Task {} committed {} files", context.getTaskAttemptID(),
commits.size());
} catch (IOException e) {
Expand All @@ -284,28 +284,28 @@ public void commitTask(TaskAttemptContext context) throws IOException {
* @return the summary file
* @throws IOException exception
*/
private Pendingset innerCommitTask(
private PendingSet innerCommitTask(
TaskAttemptContext context) throws IOException {
Path taskAttemptPath = getTaskAttemptPath(context);
// load in all pending commits.
CommitOperations actions = getCommitOperations();
Pair<Pendingset, List<Pair<LocatedFileStatus, IOException>>>
Pair<PendingSet, List<Pair<LocatedFileStatus, IOException>>>
loaded = actions.loadSinglePendingCommits(
taskAttemptPath, true);
Pendingset pendingset = loaded._1();
List<Pair<LocatedFileStatus, IOException>> failures = loaded._2();
PendingSet pendingSet = loaded.getKey();
List<Pair<LocatedFileStatus, IOException>> failures = loaded.getValue();
if (!failures.isEmpty()) {
// At least one file failed to load
// revert all which did; report failure with first exception
LOG.error("At least one commit file could not be read: failing");
abortPendingUploads(context, pendingset.getCommits(),
abortPendingUploads(context, pendingSet.getCommits(),
true);
throw failures.get(0)._2();
throw failures.get(0).getValue();
}
// patch in IDs
String jobId = String.valueOf(context.getJobID());
String taskId = String.valueOf(context.getTaskAttemptID());
for (SinglePendingCommit commit : pendingset.getCommits()) {
for (SinglePendingCommit commit : pendingSet.getCommits()) {
commit.setJobId(jobId);
commit.setTaskId(taskId);
}
Expand All @@ -317,15 +317,15 @@ private Pendingset innerCommitTask(
CommitConstants.PENDINGSET_SUFFIX);
LOG.info("Saving work of {} to {}", taskAttemptID, taskOutcomePath);
try {
pendingset.save(getDestFS(), taskOutcomePath, false);
pendingSet.save(getDestFS(), taskOutcomePath, false);
} catch (IOException e) {
LOG.warn("Failed to save task commit data to {} ",
taskOutcomePath, e);
abortPendingUploads(context, pendingset.getCommits(),
abortPendingUploads(context, pendingSet.getCommits(),
true);
throw e;
}
return pendingset;
return pendingSet;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
import org.apache.hadoop.fs.s3a.commit.DurationInfo;
import org.apache.hadoop.fs.s3a.commit.Tasks;
import org.apache.hadoop.fs.s3a.commit.files.Pendingset;
import org.apache.hadoop.fs.s3a.commit.files.PendingSet;
import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobID;
Expand Down Expand Up @@ -782,7 +782,7 @@ protected int commitTaskInternal(final TaskAttemptContext context,
// before the uploads, report some progress
context.progress();

Pendingset pendingCommits = new Pendingset(commitCount);
PendingSet pendingCommits = new PendingSet(commitCount);
try {
Tasks.foreach(taskOutput)
.stopOnFailure()
Expand Down
Loading