diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/InputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/InputFormat.java index 37648b748d179..1179021d0c616 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/InputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/InputFormat.java @@ -69,39 +69,38 @@ @InterfaceStability.Stable public abstract class InputFormat { - /** - * Logically split the set of input files for the job. - * - *

Each {@link InputSplit} is then assigned to an individual {@link Mapper} - * for processing.

+ /** + * Logically split the set of input files for the job. + * + *

+ * Each {@link InputSplit} is then assigned to an individual {@link Mapper} + * for processing. + *

+ * + *

+ * Note: The split is a logical split of the inputs and the + * input files are not physically split into chunks. For e.g. a split could be + * <input-file-path, start, offset> tuple. The InputFormat also + * creates the {@link RecordReader} to read the {@link InputSplit}. * - *

Note: The split is a logical split of the inputs and the - * input files are not physically split into chunks. For e.g. a split could - * be <input-file-path, start, offset> tuple. The InputFormat - * also creates the {@link RecordReader} to read the {@link InputSplit}. - * * @param context job configuration. * @return an array of {@link InputSplit}s for the job. */ - public abstract - List getSplits(JobContext context - ) throws IOException, InterruptedException; - + public abstract List getSplits(JobContext context) + throws IOException, InterruptedException; + /** * Create a record reader for a given split. The framework will call - * {@link RecordReader#initialize(InputSplit, TaskAttemptContext)} before - * the split is used. + * {@link RecordReader#initialize(InputSplit, TaskAttemptContext)} before the + * split is used. + * * @param split the split to be read * @param context the information about the task * @return a new record reader * @throws IOException * @throws InterruptedException */ - public abstract - RecordReader createRecordReader(InputSplit split, - TaskAttemptContext context - ) throws IOException, - InterruptedException; - + public abstract RecordReader createRecordReader(InputSplit split, + TaskAttemptContext context) throws IOException, InterruptedException; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java index b16e12729247a..c4ff041c51458 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java @@ -20,24 +20,27 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.LinkedHashSet; -import java.util.HashSet; -import java.util.List; import java.util.HashMap; -import java.util.Set; +import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.commons.collections.CollectionUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.BlockLocation; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; @@ -47,12 +50,13 @@ import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.net.NodeBase; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.google.common.collect.HashMultiset; import com.google.common.collect.Multiset; @@ -82,27 +86,30 @@ @InterfaceAudience.Public @InterfaceStability.Stable public abstract class CombineFileInputFormat - extends FileInputFormat { - + extends FileInputFormat { + private static final Logger LOG = LoggerFactory.getLogger(CombineFileInputFormat.class); - + public static final String SPLIT_MINSIZE_PERNODE = "mapreduce.input.fileinputformat.split.minsize.per.node"; public static final String SPLIT_MINSIZE_PERRACK = "mapreduce.input.fileinputformat.split.minsize.per.rack"; + // ability to limit the size of a single split private long maxSplitSize = 0; private long minSplitSizeNode = 0; private long minSplitSizeRack = 0; - // A pool of input paths filters. A split cannot have blocks from files - // across multiple pools. - private ArrayList pools = new ArrayList(); + /** + * A pool of input paths filters. A split cannot have blocks from files across + * multiple pools. + */ + private ArrayList pools = new ArrayList<>(); + + /** Mapping from a rack name to the set of Nodes in the rack. */ + private HashMap> rackToNodes = new HashMap<>(); - // mapping from a rack name to the set of Nodes in the rack - private HashMap> rackToNodes = - new HashMap>(); /** * Specify the maximum size (in bytes) of each split. Each split is * approximately equal to the specified size. @@ -147,13 +154,9 @@ protected void createPool(List filters) { * A split cannot have files from different pools. */ protected void createPool(PathFilter... filters) { - MultiPathFilter multi = new MultiPathFilter(); - for (PathFilter f: filters) { - multi.add(f); - } - pools.add(multi); + pools.add(new MultiPathFilter(Arrays.asList(filters))); } - + @Override protected boolean isSplitable(JobContext context, Path file) { final CompressionCodec codec = @@ -171,8 +174,8 @@ public CombineFileInputFormat() { } @Override - public List getSplits(JobContext job) - throws IOException { + public List getSplits(JobContext job) throws IOException { + long minSizeNode = 0; long minSizeRack = 0; long maxSize = 0; @@ -190,42 +193,44 @@ public List getSplits(JobContext job) } else { minSizeRack = conf.getLong(SPLIT_MINSIZE_PERRACK, 0); } + if (maxSplitSize != 0) { maxSize = maxSplitSize; } else { - maxSize = conf.getLong("mapreduce.input.fileinputformat.split.maxsize", 0); + maxSize = + conf.getLong("mapreduce.input.fileinputformat.split.maxsize", 0); // If maxSize is not configured, a single split will be generated per // node. } - if (minSizeNode != 0 && maxSize != 0 && minSizeNode > maxSize) { - throw new IOException("Minimum split size pernode " + minSizeNode + - " cannot be larger than maximum split size " + - maxSize); - } - if (minSizeRack != 0 && maxSize != 0 && minSizeRack > maxSize) { - throw new IOException("Minimum split size per rack " + minSizeRack + - " cannot be larger than maximum split size " + - maxSize); - } - if (minSizeRack != 0 && minSizeNode > minSizeRack) { - throw new IOException("Minimum split size per node " + minSizeNode + - " cannot be larger than minimum split " + - "size per rack " + minSizeRack); - } + + Preconditions.checkState( + minSizeNode == 0 || maxSize == 0 || minSizeNode <= maxSize, + "Minimum split size per node " + minSizeNode + + " cannot be larger than maximum split size " + maxSize); + + Preconditions.checkState( + minSizeRack == 0 || maxSize == 0 || minSizeRack <= maxSize, + "Minimum split size per rack " + minSizeRack + + " cannot be larger than maximum split size " + maxSize); + + Preconditions.checkState(minSizeRack == 0 || minSizeNode <= minSizeRack, + "Minimum split size per node " + minSizeNode + + " cannot be larger than minimum split size per rack " + + minSizeRack); // all the files in input set List stats = listStatus(job); - List splits = new ArrayList(); - if (stats.size() == 0) { - return splits; + if (stats.isEmpty()) { + return Collections.emptyList(); } + List splits = new ArrayList<>(); + // In one single iteration, process all the paths in a single pool. // Processing one pool at a time ensures that a split contains paths // from a single pool only. for (MultiPathFilter onepool : pools) { - ArrayList myPaths = new ArrayList(); - + ArrayList myPaths = new ArrayList<>(); // pick one input path. If it matches all the filters in a pool, // add it to the output set for (Iterator iter = stats.iterator(); iter.hasNext();) { @@ -244,49 +249,43 @@ public List getSplits(JobContext job) // free up rackToNodes map rackToNodes.clear(); - return splits; + return splits; } /** * Return all the splits in the specified set of paths */ private void getMoreSplits(JobContext job, List stats, - long maxSize, long minSizeNode, long minSizeRack, - List splits) - throws IOException { + long maxSize, long minSizeNode, long minSizeRack, List splits) + throws IOException { + + if (CollectionUtils.isEmpty(stats)) { + return; + } + Configuration conf = job.getConfiguration(); - // all blocks for all the files in input set - OneFileInfo[] files; - // mapping from a rack name to the list of blocks it has - HashMap> rackToBlocks = - new HashMap>(); + HashMap> rackToBlocks = new HashMap<>(); // mapping from a block to the nodes on which it has replicas - HashMap blockToNodes = - new HashMap(); + HashMap blockToNodes = new HashMap<>(); // mapping from a node to the list of blocks that it contains - HashMap> nodeToBlocks = - new HashMap>(); - - files = new OneFileInfo[stats.size()]; - if (stats.size() == 0) { - return; - } + HashMap> nodeToBlocks = new HashMap<>(); // populate all the blocks for all files long totLength = 0; - int i = 0; - for (FileStatus stat : stats) { - files[i] = new OneFileInfo(stat, conf, isSplitable(job, stat.getPath()), - rackToBlocks, blockToNodes, nodeToBlocks, - rackToNodes, maxSize); - totLength += files[i].getLength(); + + for (final FileStatus stat : stats) { + final OneFileInfo fileInfo = + new OneFileInfo(stat, conf, isSplitable(job, stat.getPath()), + rackToBlocks, blockToNodes, nodeToBlocks, rackToNodes, maxSize); + totLength += fileInfo.getLength(); } - createSplits(nodeToBlocks, blockToNodes, rackToBlocks, totLength, - maxSize, minSizeNode, minSizeRack, splits); + + createSplits(nodeToBlocks, blockToNodes, rackToBlocks, totLength, maxSize, + minSizeNode, minSizeRack, splits); } /** @@ -311,30 +310,27 @@ private void getMoreSplits(JobContext job, List stats, */ @VisibleForTesting void createSplits(Map> nodeToBlocks, - Map blockToNodes, - Map> rackToBlocks, - long totLength, - long maxSize, - long minSizeNode, - long minSizeRack, - List splits - ) { - ArrayList validBlocks = new ArrayList(); + Map blockToNodes, + Map> rackToBlocks, long totLength, + long maxSize, long minSizeNode, long minSizeRack, + List splits) { + + ArrayList validBlocks = new ArrayList<>(); long curSplitSize = 0; - + int totalNodes = nodeToBlocks.size(); long totalLength = totLength; Multiset splitsPerNode = HashMultiset.create(); - Set completedNodes = new HashSet(); - + Set completedNodes = new HashSet<>(); + while(true) { for (Iterator>> iter = nodeToBlocks .entrySet().iterator(); iter.hasNext();) { Map.Entry> one = iter.next(); - + String node = one.getKey(); - + // Skip the node if it has previously been marked as completed. if (completedNodes.contains(node)) { continue; @@ -348,14 +344,14 @@ void createSplits(Map> nodeToBlocks, Iterator oneBlockIter = blocksInCurrentNode.iterator(); while (oneBlockIter.hasNext()) { OneBlockInfo oneblock = oneBlockIter.next(); - + // Remove all blocks which may already have been assigned to other // splits. - if(!blockToNodes.containsKey(oneblock)) { + if (!blockToNodes.containsKey(oneblock)) { oneBlockIter.remove(); continue; } - + validBlocks.add(oneblock); blockToNodes.remove(oneblock); curSplitSize += oneblock.length; @@ -379,12 +375,12 @@ void createSplits(Map> nodeToBlocks, // node so that splits are distributed across nodes. break; } - } + if (validBlocks.size() != 0) { // This implies that the last few blocks (or all in case maxSize=0) // were not part of a split. The node is complete. - + // if there were any blocks left over and their combined size is // larger than minSplitNode, then combine them into one split. // Otherwise add them back to the unprocessed pool. It is likely @@ -434,11 +430,11 @@ void createSplits(Map> nodeToBlocks, // if blocks in a rack are below the specified minimum size, then keep them // in 'overflow'. After the processing of all racks is complete, these // overflow blocks will be combined into splits. - ArrayList overflowBlocks = new ArrayList(); - Set racks = new HashSet(); + ArrayList overflowBlocks = new ArrayList<>(); + Set racks = new HashSet<>(); // Process all racks over and over again until there is no more work to do. - while (blockToNodes.size() > 0) { + while (!blockToNodes.isEmpty()) { // Create one split for this rack before moving over to the next rack. // Come back to this rack after creating a single split for each of the @@ -490,12 +486,13 @@ void createSplits(Map> nodeToBlocks, // otherwise, store these blocks into overflow data structure addCreatedSplit(splits, getHosts(racks), validBlocks); } else { - // There were a few blocks in this rack that - // remained to be processed. Keep them in 'overflow' block list. - // These will be combined later. + // There were a few blocks in this rack that + // remained to be processed. Keep them in 'overflow' block list. + // These will be combined later. overflowBlocks.addAll(validBlocks); } } + curSplitSize = 0; validBlocks.clear(); racks.clear(); @@ -514,9 +511,7 @@ void createSplits(Map> nodeToBlocks, // This might cause an exiting rack location to be re-added, // but it should be ok. - for (int i = 0; i < oneblock.racks.length; i++) { - racks.add(oneblock.racks[i]); - } + racks.addAll(Arrays.asList(oneblock.racks)); // if the accumulated split size exceeds the maximum, then // create this split. @@ -539,22 +534,27 @@ void createSplits(Map> nodeToBlocks, * Create a single split from the list of blocks specified in validBlocks * Add this new split into splitList. */ - private void addCreatedSplit(List splitList, - Collection locations, - ArrayList validBlocks) { + private void addCreatedSplit(List splitList, + Collection locations, List validBlocks) { + // create an input split Path[] fl = new Path[validBlocks.size()]; long[] offset = new long[validBlocks.size()]; long[] length = new long[validBlocks.size()]; - for (int i = 0; i < validBlocks.size(); i++) { - fl[i] = validBlocks.get(i).onepath; - offset[i] = validBlocks.get(i).offset; - length[i] = validBlocks.get(i).length; + + int i = 0; + for (OneBlockInfo validBlock : validBlocks) { + fl[i] = validBlock.onepath; + offset[i] = validBlock.offset; + length[i] = validBlock.length; + i++; } - // add this split to the list that is returned - CombineFileSplit thissplit = new CombineFileSplit(fl, offset, - length, locations.toArray(new String[0])); - splitList.add(thissplit); + + // add this split to the list that is returned + CombineFileSplit thissplit = new CombineFileSplit(fl, offset, length, + locations.toArray(new String[0])); + + splitList.add(thissplit); } /** @@ -568,17 +568,20 @@ public abstract RecordReader createRecordReader(InputSplit split, */ @VisibleForTesting static class OneFileInfo { - private long fileSize; // size of the file - private OneBlockInfo[] blocks; // all blocks in this file - - OneFileInfo(FileStatus stat, Configuration conf, - boolean isSplitable, - HashMap> rackToBlocks, - HashMap blockToNodes, - HashMap> nodeToBlocks, - HashMap> rackToNodes, - long maxSize) - throws IOException { + + /** Size of the file. */ + private long fileSize; + + /** All blocks in this file. */ + private OneBlockInfo[] blocks; + + OneFileInfo(FileStatus stat, Configuration conf, boolean isSplitable, + HashMap> rackToBlocks, + HashMap blockToNodes, + HashMap> nodeToBlocks, + HashMap> rackToNodes, long maxSize) + throws IOException { + this.fileSize = 0; // get block locations from file system @@ -589,35 +592,35 @@ static class OneFileInfo { FileSystem fs = stat.getPath().getFileSystem(conf); locations = fs.getFileBlockLocations(stat, 0, stat.getLen()); } + // create a list of all block and their locations if (locations == null) { blocks = new OneBlockInfo[0]; } else { - if(locations.length == 0 && !stat.isDirectory()) { + if (locations.length == 0 && !stat.isDirectory()) { locations = new BlockLocation[] { new BlockLocation() }; } if (!isSplitable) { // if the file is not splitable, just create the one block with // full file length - if (LOG.isDebugEnabled()) { - LOG.debug("File is not splittable so no parallelization " - + "is possible: " + stat.getPath()); - } + LOG.debug("File is not splittable so no parallelization " + + "is possible: {}", stat.getPath()); + blocks = new OneBlockInfo[1]; fileSize = stat.getLen(); blocks[0] = new OneBlockInfo(stat.getPath(), 0, fileSize, locations[0].getHosts(), locations[0].getTopologyPaths()); } else { - ArrayList blocksList = new ArrayList( - locations.length); - for (int i = 0; i < locations.length; i++) { - fileSize += locations[i].getLength(); + List blocksList = new ArrayList<>(locations.length); + + for (final BlockLocation blockLocation : locations) { + fileSize += blockLocation.getLength(); // each split can be a maximum of maxSize - long left = locations[i].getLength(); - long myOffset = locations[i].getOffset(); + long left = blockLocation.getLength(); + long myOffset = blockLocation.getOffset(); long myLength = 0; do { if (maxSize == 0) { @@ -634,29 +637,30 @@ static class OneFileInfo { myLength = Math.min(maxSize, left); } } - OneBlockInfo oneblock = new OneBlockInfo(stat.getPath(), - myOffset, myLength, locations[i].getHosts(), - locations[i].getTopologyPaths()); + OneBlockInfo oneblock = new OneBlockInfo(stat.getPath(), myOffset, + myLength, blockLocation.getHosts(), + blockLocation.getTopologyPaths()); left -= myLength; myOffset += myLength; blocksList.add(oneblock); } while (left > 0); } - blocks = blocksList.toArray(new OneBlockInfo[blocksList.size()]); + blocks = blocksList.toArray(new OneBlockInfo[0]); } - - populateBlockInfo(blocks, rackToBlocks, blockToNodes, - nodeToBlocks, rackToNodes); + + populateBlockInfo(blocks, rackToBlocks, blockToNodes, nodeToBlocks, + rackToNodes); } } - + @VisibleForTesting static void populateBlockInfo(OneBlockInfo[] blocks, - Map> rackToBlocks, - Map blockToNodes, - Map> nodeToBlocks, - Map> rackToNodes) { + Map> rackToBlocks, + Map blockToNodes, + Map> nodeToBlocks, + Map> rackToNodes) { + for (OneBlockInfo oneblock : blocks) { // add this block to the block --> node locations map blockToNodes.put(oneblock, oneblock.hosts); @@ -673,12 +677,8 @@ static void populateBlockInfo(OneBlockInfo[] blocks, // add this block to the rack --> block map for (int j = 0; j < racks.length; j++) { String rack = racks[j]; - List blklist = rackToBlocks.get(rack); - if (blklist == null) { - blklist = new ArrayList(); - rackToBlocks.put(rack, blklist); - } - blklist.add(oneblock); + rackToBlocks.computeIfAbsent(rack, r -> new ArrayList<>()) + .add(oneblock); if (!racks[j].equals(NetworkTopology.DEFAULT_RACK)) { // Add this host to rackToNodes map addHostToRack(rackToNodes, racks[j], oneblock.hosts[j]); @@ -686,14 +686,9 @@ static void populateBlockInfo(OneBlockInfo[] blocks, } // add this block to the node --> block map - for (int j = 0; j < oneblock.hosts.length; j++) { - String node = oneblock.hosts[j]; - Set blklist = nodeToBlocks.get(node); - if (blklist == null) { - blklist = new LinkedHashSet(); - nodeToBlocks.put(node, blklist); - } - blklist.add(oneblock); + for (String node : oneblock.hosts) { + nodeToBlocks.computeIfAbsent(node, n -> new LinkedHashSet<>()) + .add(oneblock); } } } @@ -708,7 +703,7 @@ OneBlockInfo[] getBlocks() { } /** - * information about one block from the File System + * Information about one block from the File System. */ @VisibleForTesting static class OneBlockInfo { @@ -718,22 +713,22 @@ static class OneBlockInfo { String[] hosts; // nodes on which this block resides String[] racks; // network topology of hosts - OneBlockInfo(Path path, long offset, long len, - String[] hosts, String[] topologyPaths) { + OneBlockInfo(Path path, long offset, long len, String[] hosts, + String[] topologyPaths) { this.onepath = path; this.offset = offset; this.hosts = hosts; this.length = len; - assert (hosts.length == topologyPaths.length || - topologyPaths.length == 0); + assert (hosts.length == topologyPaths.length + || topologyPaths.length == 0); // if the file system does not have any rack information, then // use dummy rack location. if (topologyPaths.length == 0) { topologyPaths = new String[hosts.length]; for (int i = 0; i < topologyPaths.length; i++) { - topologyPaths[i] = (new NodeBase(hosts[i], - NetworkTopology.DEFAULT_RACK)).toString(); + topologyPaths[i] = + new NodeBase(hosts[i], NetworkTopology.DEFAULT_RACK).toString(); } } @@ -741,7 +736,7 @@ static class OneBlockInfo { // component. Strip it. this.racks = new String[topologyPaths.length]; for (int i = 0; i < topologyPaths.length; i++) { - this.racks[i] = (new NodeBase(topologyPaths[i])).getNetworkLocation(); + this.racks[i] = new NodeBase(topologyPaths[i]).getNetworkLocation(); } } } @@ -755,15 +750,10 @@ protected BlockLocation[] getFileBlockLocations( } private static void addHostToRack(Map> rackToNodes, - String rack, String host) { - Set hosts = rackToNodes.get(rack); - if (hosts == null) { - hosts = new HashSet(); - rackToNodes.put(rack, hosts); - } - hosts.add(host); + String rack, String host) { + rackToNodes.computeIfAbsent(rack, r -> new HashSet<>()).add(host); } - + private Set getHosts(Set racks) { Set hosts = new HashSet(); for (String rack : racks) { @@ -773,26 +763,17 @@ private Set getHosts(Set racks) { } return hosts; } - + /** - * Accept a path only if any one of filters given in the - * constructor do. + * Accept a path only if any one of filters given in the constructor do. */ private static class MultiPathFilter implements PathFilter { private List filters; - public MultiPathFilter() { - this.filters = new ArrayList(); - } - public MultiPathFilter(List filters) { this.filters = filters; } - public void add(PathFilter one) { - filters.add(one); - } - public boolean accept(Path path) { for (PathFilter filter : filters) { if (filter.accept(path)) { @@ -803,14 +784,8 @@ public boolean accept(Path path) { } public String toString() { - StringBuffer buf = new StringBuffer(); - buf.append("["); - for (PathFilter f: filters) { - buf.append(f); - buf.append(","); - } - buf.append("]"); - return buf.toString(); + return '[' + filters.stream().map(f -> f.toString()) + .collect(Collectors.joining(",")) + ']'; } } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileSplit.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileSplit.java index 3c00689381803..40aca8d1ff70f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileSplit.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileSplit.java @@ -21,6 +21,9 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -54,34 +57,31 @@ public class CombineFileSplit extends InputSplit implements Writable { private long totLength; /** - * default constructor + * Default constructor. */ - public CombineFileSplit() {} - public CombineFileSplit(Path[] files, long[] start, - long[] lengths, String[] locations) { + public CombineFileSplit() { + } + + public CombineFileSplit(Path[] files, long[] start, long[] lengths, + String[] locations) { initSplit(files, start, lengths, locations); } public CombineFileSplit(Path[] files, long[] lengths) { long[] startoffset = new long[files.length]; - for (int i = 0; i < startoffset.length; i++) { - startoffset[i] = 0; - } String[] locations = new String[files.length]; - for (int i = 0; i < locations.length; i++) { - locations[i] = ""; - } + Arrays.fill(locations, ""); initSplit(files, startoffset, lengths, locations); } - - private void initSplit(Path[] files, long[] start, - long[] lengths, String[] locations) { + + private void initSplit(Path[] files, long[] start, long[] lengths, + String[] locations) { this.startoffset = start; this.lengths = lengths; this.paths = files; - this.totLength = 0; + this.totLength = 0L; this.locations = locations; - for(long length : lengths) { + for (long length : lengths) { totLength += length; } } @@ -90,8 +90,8 @@ private void initSplit(Path[] files, long[] start, * Copy constructor */ public CombineFileSplit(CombineFileSplit old) throws IOException { - this(old.getPaths(), old.getStartOffsets(), - old.getLengths(), old.getLocations()); + this(old.getPaths(), old.getStartOffsets(), old.getLengths(), + old.getLocations()); } public long getLength() { @@ -142,17 +142,17 @@ public void readFields(DataInput in) throws IOException { totLength = in.readLong(); int arrLength = in.readInt(); lengths = new long[arrLength]; - for(int i=0; i pathStrList = new ArrayList<>(paths.length); for (int i = 0; i < paths.length; i++) { - if (i == 0 ) { - sb.append("Paths:"); - } - sb.append(paths[i].toUri().getPath() + ":" + startoffset[i] + - "+" + lengths[i]); - if (i < paths.length -1) { - sb.append(","); - } + pathStrList.add( + paths[i].toUri().getPath() + ':' + startoffset[i] + '+' + lengths[i]); } + + sb.append("Paths: [").append(String.join(",", pathStrList)).append(']'); + if (locations != null) { - String locs = ""; - StringBuffer locsb = new StringBuffer(); - for (int i = 0; i < locations.length; i++) { - locsb.append(locations[i] + ":"); - } - locs = locsb.toString(); - sb.append(" Locations:" + locs + "; "); + sb.append(" Locations: [").append(String.join(":", locations)); } + return sb.toString(); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineSequenceFileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineSequenceFileInputFormat.java index dbbdc3d44c1bb..908bba49a3796 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineSequenceFileInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineSequenceFileInputFormat.java @@ -50,6 +50,7 @@ public class TestCombineSequenceFileInputFormat { LoggerFactory.getLogger(TestCombineSequenceFileInputFormat.class); private static Configuration conf = new Configuration(); private static FileSystem localFs = null; + private static final Random RANDOM = new Random(13L); static { try { @@ -68,10 +69,6 @@ public class TestCombineSequenceFileInputFormat { public void testFormat() throws IOException, InterruptedException { Job job = Job.getInstance(conf); - Random random = new Random(); - long seed = random.nextLong(); - random.setSeed(seed); - localFs.delete(workDir, true); FileInputFormat.setInputPaths(job, workDir); @@ -79,7 +76,7 @@ public void testFormat() throws IOException, InterruptedException { final int numFiles = 10; // create files with a variety of lengths - createFiles(length, numFiles, random, job); + createFiles(length, numFiles, RANDOM, job); TaskAttemptContext context = MapReduceTestUtil. createDummyMapTaskAttemptContext(job.getConfiguration()); @@ -87,11 +84,11 @@ public void testFormat() throws IOException, InterruptedException { InputFormat format = new CombineSequenceFileInputFormat(); for (int i = 0; i < 3; i++) { - int numSplits = - random.nextInt(length/(SequenceFile.SYNC_INTERVAL/20)) + 1; - LOG.info("splitting: requesting = " + numSplits); + final int numSplits = + RANDOM.nextInt(length/(SequenceFile.SYNC_INTERVAL/20)) + 1; List splits = format.getSplits(job); - LOG.info("splitting: got = " + splits.size()); + LOG.info("Splitting: requesting = {}; got = {}", numSplits, + splits.size()); // we should have a single split as the length is comfortably smaller than // the block size