diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3GuardCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3GuardCommitter.java index 9cc0b838e7b51..af6419840b034 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3GuardCommitter.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3GuardCommitter.java @@ -38,7 +38,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.S3AFileSystem; -import org.apache.hadoop.fs.s3a.commit.files.Pendingset; +import org.apache.hadoop.fs.s3a.commit.files.PendingSet; import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit; import org.apache.hadoop.fs.s3a.commit.files.SuccessData; import org.apache.hadoop.mapreduce.JobContext; @@ -490,7 +490,7 @@ protected List loadMultiplePendingCommitFiles( .run(new Tasks.Task() { @Override public void run(FileStatus pendingCommitFile) throws IOException { - Pendingset commits = Pendingset.load( + PendingSet commits = PendingSet.load( fs, pendingCommitFile.getPath()); pending.addAll(commits.getCommits()); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitOperations.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitOperations.java index 9037d076e2818..751439c2784e8 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitOperations.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitOperations.java @@ -33,6 +33,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.LocatedFileStatus; @@ -43,7 +44,7 @@ import org.apache.hadoop.fs.s3a.S3ALambda; import org.apache.hadoop.fs.s3a.S3ARetryPolicy; import org.apache.hadoop.fs.s3a.WriteOperationHelper; -import org.apache.hadoop.fs.s3a.commit.files.Pendingset; +import org.apache.hadoop.fs.s3a.commit.files.PendingSet; import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit; import org.apache.hadoop.fs.s3a.commit.files.SuccessData; @@ -199,14 +200,14 @@ public List locateAllSinglePendingCommits( * not load/validate. * @throws IOException on a failure to list the files. */ - public Pair>> loadSinglePendingCommits(Path pendingDir, boolean recursive) throws IOException { List statusList = locateAllSinglePendingCommits( pendingDir, recursive); - Pendingset commits = new Pendingset( + PendingSet commits = new PendingSet( statusList.size()); List> failures = new ArrayList<>(1); for (LocatedFileStatus status : statusList) { @@ -214,7 +215,7 @@ public List locateAllSinglePendingCommits( commits.add(SinglePendingCommit.load(fs, status.getPath())); } catch (IOException e) { LOG.warn("Failed to load commit file {}", status.getPath(), e); - failures.add(new Pair<>(status, e)); + failures.add(Pair.of(status, e)); } } return Pair.of(commits, failures); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitUtils.java index 5f569961474df..97e71ed10fb32 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitUtils.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitUtils.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -77,27 +78,13 @@ private CommitUtils() { * @throws IllegalArgumentException if the path is invalid -relative, empty... */ public static List splitPathToElements(Path path) { - String uriPath = path.toUri().getPath(); - checkArgument(!uriPath.isEmpty(), "empty path"); - checkArgument(uriPath.charAt(0) == '/', "path is relative"); - if ("/".equals(uriPath)) { - // special case: empty list - return new ArrayList<>(0); - } - List elements = new ArrayList<>(); - int len = uriPath.length(); - int firstElementChar = 1; - int endOfElement = uriPath.indexOf('/', firstElementChar); - while (endOfElement > 0) { - elements.add(uriPath.substring(firstElementChar, endOfElement)); - firstElementChar = endOfElement + 1; - endOfElement = firstElementChar == len ? -1 - : uriPath.indexOf('/', firstElementChar); - } - // expect a possible child element here - if (firstElementChar != len) { - elements.add(uriPath.substring(firstElementChar)); + checkArgument(!path.equals(""), "empty path"); + checkArgument(path.isAbsolute(), "path is relative"); + if ("/".equals(path.toString())) { + return Collections.emptyList(); } + List elements = Arrays.asList(path.toString() + .substring(1).split("/")); return elements; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/Pair.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/Pair.java deleted file mode 100644 index 81270c29f9926..0000000000000 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/Pair.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.s3a.commit; - -/** - * A typed tuple. - * Accessor names are consistent with those of Scala, which explains why - * they aren't that intuitive. - * @param left element type - * @param right element type - */ -public class Pair { - private final L first; - private final R second; - - /** - * Create an instance. - * @param first first element - * @param second second - * @param type of first element. - * @param type of second element - * @return a pair instance - */ - public static Pair of(L first, R second) { - return new Pair<>(first, second); - } - - public Pair(L first, R second) { - this.first = first; - this.second = second; - } - - /** - * Get the first element. - * @return the first element - */ - public L _1() { - return first; - } - - /** - * Get the second element. - * @return the second element. - */ - public R _2() { - return second; - } - - @Override - public String toString() { - final StringBuilder sb = new StringBuilder("("); - sb.append(first); - sb.append(", ").append(second); - sb.append(')'); - return sb.toString(); - } -} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/Tasks.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/Tasks.java index b15ce3bc9c3c4..9b3bda551e8cf 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/Tasks.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/Tasks.java @@ -18,6 +18,7 @@ package org.apache.hadoop.fs.s3a.commit; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Iterator; @@ -28,7 +29,6 @@ import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; -import com.google.common.collect.Lists; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -152,8 +152,8 @@ public boolean run(Task task) throws E { private boolean runSingleThreaded(Task task) throws E { - List succeeded = Lists.newArrayList(); - List exceptions = Lists.newArrayList(); + List succeeded = new ArrayList(); + List exceptions = new ArrayList(); Iterator iterator = items.iterator(); boolean threw = true; @@ -238,7 +238,7 @@ private boolean runParallel(final Task task) final AtomicBoolean abortFailed = new AtomicBoolean(false); final AtomicBoolean revertFailed = new AtomicBoolean(false); - List> futures = Lists.newArrayList(); + List> futures = new ArrayList(); for (final I item : items) { // submit a task for each item that will either run or abort the task diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/Pendingset.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/PendingSet.java similarity index 88% rename from hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/Pendingset.java rename to hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/PendingSet.java index fe838fa67d9b2..6a00613abbcf7 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/Pendingset.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/PendingSet.java @@ -47,8 +47,8 @@ */ @InterfaceAudience.Private @InterfaceStability.Unstable -public class Pendingset extends PersistentCommitData { - private static final Logger LOG = LoggerFactory.getLogger(Pendingset.class); +public class PendingSet extends PersistentCommitData { + private static final Logger LOG = LoggerFactory.getLogger(PendingSet.class); /** * Supported version value: {@value}. @@ -74,14 +74,14 @@ public class Pendingset extends PersistentCommitData { /** * Any custom extra data committer subclasses may choose to add. */ - private Map extraData = new HashMap<>(0); + private final Map extraData = new HashMap<>(0); - public Pendingset() { + public PendingSet() { this(0); } - public Pendingset(int size) { + public PendingSet(int size) { commits = new ArrayList<>(size); } @@ -89,8 +89,8 @@ public Pendingset(int size) { * Get a JSON serializer for this class. * @return a serializer. */ - public static JsonSerialization serializer() { - return new JsonSerialization<>(Pendingset.class, false, true); + public static JsonSerialization serializer() { + return new JsonSerialization<>(PendingSet.class, false, true); } /** @@ -101,10 +101,10 @@ public static JsonSerialization serializer() { * @throws IOException IO failure * @throws ValidationFailure if the data is invalid */ - public static Pendingset load(FileSystem fs, Path path) + public static PendingSet load(FileSystem fs, Path path) throws IOException { LOG.debug("Reading pending commits in file {}", path); - Pendingset instance = serializer().load(fs, path); + PendingSet instance = serializer().load(fs, path); instance.validate(); return instance; } @@ -190,15 +190,4 @@ public List getCommits() { public void setCommits(List commits) { this.commits = commits; } - - /** - * @return any custom extra data committer subclasses may choose to add. - */ - public Map getExtraData() { - return extraData; - } - - public void setExtraData(Map extraData) { - this.extraData = extraData; - } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java index 10018b945dc19..6f27599f3e629 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java @@ -25,6 +25,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.FileStatus; @@ -37,9 +38,8 @@ import org.apache.hadoop.fs.s3a.commit.CommitConstants; import org.apache.hadoop.fs.s3a.commit.CommitUtils; import org.apache.hadoop.fs.s3a.commit.DurationInfo; -import org.apache.hadoop.fs.s3a.commit.Pair; import org.apache.hadoop.fs.s3a.commit.PathCommitException; -import org.apache.hadoop.fs.s3a.commit.files.Pendingset; +import org.apache.hadoop.fs.s3a.commit.files.PendingSet; import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.TaskAttemptContext; @@ -261,7 +261,7 @@ private boolean needsTaskCommit(TaskAttemptContext context, public void commitTask(TaskAttemptContext context) throws IOException { try (DurationInfo d = new DurationInfo("Commit task %s", context.getTaskAttemptID())) { - Pendingset commits = innerCommitTask(context); + PendingSet commits = innerCommitTask(context); LOG.info("Task {} committed {} files", context.getTaskAttemptID(), commits.size()); } catch (IOException e) { @@ -284,28 +284,28 @@ public void commitTask(TaskAttemptContext context) throws IOException { * @return the summary file * @throws IOException exception */ - private Pendingset innerCommitTask( + private PendingSet innerCommitTask( TaskAttemptContext context) throws IOException { Path taskAttemptPath = getTaskAttemptPath(context); // load in all pending commits. CommitOperations actions = getCommitOperations(); - Pair>> + Pair>> loaded = actions.loadSinglePendingCommits( taskAttemptPath, true); - Pendingset pendingset = loaded._1(); - List> failures = loaded._2(); + PendingSet pendingSet = loaded.getKey(); + List> failures = loaded.getValue(); if (!failures.isEmpty()) { // At least one file failed to load // revert all which did; report failure with first exception LOG.error("At least one commit file could not be read: failing"); - abortPendingUploads(context, pendingset.getCommits(), + abortPendingUploads(context, pendingSet.getCommits(), true); - throw failures.get(0)._2(); + throw failures.get(0).getValue(); } // patch in IDs String jobId = String.valueOf(context.getJobID()); String taskId = String.valueOf(context.getTaskAttemptID()); - for (SinglePendingCommit commit : pendingset.getCommits()) { + for (SinglePendingCommit commit : pendingSet.getCommits()) { commit.setJobId(jobId); commit.setTaskId(taskId); } @@ -317,15 +317,15 @@ private Pendingset innerCommitTask( CommitConstants.PENDINGSET_SUFFIX); LOG.info("Saving work of {} to {}", taskAttemptID, taskOutcomePath); try { - pendingset.save(getDestFS(), taskOutcomePath, false); + pendingSet.save(getDestFS(), taskOutcomePath, false); } catch (IOException e) { LOG.warn("Failed to save task commit data to {} ", taskOutcomePath, e); - abortPendingUploads(context, pendingset.getCommits(), + abortPendingUploads(context, pendingSet.getCommits(), true); throw e; } - return pendingset; + return pendingSet; } /** diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java index 9331969923407..c6ca011dd1baa 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java @@ -46,7 +46,7 @@ import org.apache.hadoop.fs.s3a.commit.CommitConstants; import org.apache.hadoop.fs.s3a.commit.DurationInfo; import org.apache.hadoop.fs.s3a.commit.Tasks; -import org.apache.hadoop.fs.s3a.commit.files.Pendingset; +import org.apache.hadoop.fs.s3a.commit.files.PendingSet; import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobID; @@ -782,7 +782,7 @@ protected int commitTaskInternal(final TaskAttemptContext context, // before the uploads, report some progress context.progress(); - Pendingset pendingCommits = new Pendingset(commitCount); + PendingSet pendingCommits = new PendingSet(commitCount); try { Tasks.foreach(taskOutput) .stopOnFailure() diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3a_committer_architecture.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3a_committer_architecture.md index fcc097a666182..8e131016f8a63 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3a_committer_architecture.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3a_committer_architecture.md @@ -635,9 +635,6 @@ of the upload, which make this algorithm viable. ## The Netfix "Staging" committer -This committer was donated to the ASF by Ryan Blue of netflix -e name of this committer is - Ryan Blue, of Netflix, has submitted an alternate committer, one which has a number of appealing features @@ -870,7 +867,7 @@ Development on this committer was developed before Netflix donated their committ Work has since focused on the staging committer, certainly for an initial release. Ignoring the changes to the output stream and "magic" path handling, the -actual committer code is now every common: +actual committer code is now refactored to have a lot of code in common. By making changes to the `S3AFileSystem` and the `S3ABlockOutputStream`, this committer manages to postpone the completion of writes of all files written to special @@ -885,7 +882,7 @@ writes are instantiated. This algorithm uses a modified `S3ABlockOutputStream` Output stream, which, rather than commit any active multipart upload in the final `close()` operation, -it insteads save enough information into the S3 repository for an independent +it instead saves enough information into the S3 repository for an independent process to be able to complete or abort the upload. Originally, in `OutputStream.close()`, it chose whether to perform a single PUT or to @@ -953,7 +950,7 @@ to the job attempt. 1. These are merged into to a single `Pendingset` structure. 1. Which is saved to a `.pendingset` file in the job attempt directory. 1. Finally, the task attempt directory is deleted. In the example, this -would be to `/results/latest/__magic/job400_1/task_01_01.pendingset`; +would be to `/results/latest/__magic/job400_1/task_01_01.pendingset`; A failure to load any of the single pending upload files (i.e. the file @@ -1027,7 +1024,7 @@ Pending uploads will remain, but no changes will be visible. If the `.pendingset` file has been saved to the job attempt directory, the task has effectively committed, it has just failed to report to the controller. This will cause complications during job commit, as there -may be two task pendingset committing the same files, or committing +may be two task PendingSet committing the same files, or committing files with *Proposed*: track task ID in pendingsets, recognise duplicates on load diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3AFileSystem.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3AFileSystem.java index fec321595983d..d7c440e18dc22 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3AFileSystem.java @@ -30,6 +30,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -38,7 +39,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.fs.s3a.commit.Pair; import org.apache.hadoop.fs.s3a.commit.staging.StagingTestBase; import org.apache.hadoop.util.Progressable; diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestS3AHugePendingCommits.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestS3AHugePendingCommits.java index f1127295ca338..54b3bebed756b 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestS3AHugePendingCommits.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestS3AHugePendingCommits.java @@ -25,13 +25,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.fs.s3a.Constants; import org.apache.hadoop.fs.s3a.S3AFileSystem; -import org.apache.hadoop.fs.s3a.commit.files.Pendingset; +import org.apache.hadoop.fs.s3a.commit.files.PendingSet; import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit; import org.apache.hadoop.fs.s3a.scale.AbstractSTestS3AHugeFiles; @@ -111,9 +112,10 @@ public void test_030_postCreationAssertions() throws Throwable { ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer(); CommitOperations actions = new CommitOperations(fs); Preconditions.checkArgument(jobDir != null, "null pendingDir"); - Pair>> + Pair>> results = actions.loadSinglePendingCommits(jobDir, false); - for (SinglePendingCommit singlePendingCommit : results._1().getCommits()) { + for (SinglePendingCommit singlePendingCommit : + results.getKey().getCommits()) { actions.commitOrFail(singlePendingCommit); } timer.end("time to commit %s", pendingDataFile); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/MockedStagingCommitter.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/MockedStagingCommitter.java index 4c4957d9cd94f..c478f8ea6fc96 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/MockedStagingCommitter.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/MockedStagingCommitter.java @@ -98,12 +98,12 @@ protected void maybeCreateSuccessMarker(JobContext context, public ClientResults getResults() throws IOException { MockS3AFileSystem mockFS = (MockS3AFileSystem)getDestS3AFS(); - return mockFS.getOutcome()._1(); + return mockFS.getOutcome().getKey(); } public ClientErrors getErrors() throws IOException { MockS3AFileSystem mockFS = (MockS3AFileSystem) getDestS3AFS(); - return mockFS.getOutcome()._2(); + return mockFS.getOutcome().getValue(); } @Override diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/StagingTestBase.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/StagingTestBase.java index c3630ad457e6a..e87a2b5d779f1 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/StagingTestBase.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/StagingTestBase.java @@ -49,6 +49,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystemTestHelper; @@ -58,7 +59,6 @@ import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.commit.CommitConstants; import org.apache.hadoop.fs.s3a.commit.MiniDFSTestCluster; -import org.apache.hadoop.fs.s3a.commit.Pair; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.JobContext; diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingCommitter.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingCommitter.java index 71c853a04c131..f7ca97542d800 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingCommitter.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingCommitter.java @@ -44,6 +44,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -53,8 +54,7 @@ import org.apache.hadoop.fs.s3a.Constants; import org.apache.hadoop.fs.s3a.MockS3AFileSystem; import org.apache.hadoop.fs.s3a.S3AFileSystem; -import org.apache.hadoop.fs.s3a.commit.files.Pendingset; -import org.apache.hadoop.fs.s3a.commit.Pair; +import org.apache.hadoop.fs.s3a.commit.files.PendingSet; import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.JobContext; @@ -249,7 +249,7 @@ public void testSingleTaskCommit() throws Exception { assertEquals("Should name the commits file with the task ID: " + results, "task_job_0001_r_000002", stats[0].getPath().getName()); - Pendingset pending = Pendingset.load(dfs, + PendingSet pending = PendingSet.load(dfs, stats[0].getPath()); assertEquals("Should have one pending commit", 1, pending.size()); SinglePendingCommit commit = pending.getCommits().get(0); @@ -290,7 +290,7 @@ public void testSingleTaskEmptyFileCommit() throws Exception { assertEquals("Should name the commits file with the task ID", "task_job_0001_r_000002", stats[0].getPath().getName()); - Pendingset pending = Pendingset.load(dfs, + PendingSet pending = PendingSet.load(dfs, stats[0].getPath()); assertEquals("Should have one pending commit", 1, pending.size()); } @@ -314,7 +314,7 @@ public void testSingleTaskMultiFileCommit() throws Exception { "task_job_0001_r_000002", stats[0].getPath().getName()); List pending = - Pendingset.load(dfs, stats[0].getPath()).getCommits(); + PendingSet.load(dfs, stats[0].getPath()).getCommits(); assertEquals("Should have correct number of pending commits", files.size(), pending.size());