diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyFilter.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyFilter.java index f5f00f17cf5c1..0ba1d2822f2a0 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyFilter.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyFilter.java @@ -46,6 +46,33 @@ public void initialize() {} */ public abstract boolean shouldCopy(Path path); + /** + * Predicate to determine if a fileStatus can be excluded from copy. + * The fileStatus object has various attrs, so it is convenient to do + * more complex thing. + * + * The behaviour of calling shouldCopy() is like this: + * if supportFileStatus() is true, then call shouldCopy(fileStatus) + * if supportFileStatus() is false, then call shouldCopy(path) + * + * + * @param fileStatus a FileStatus to be considered for copying + * @return boolean, true to copy, false to exclude + */ + public boolean shouldCopy(CopyListingFileStatus fileStatus){ + return shouldCopy(fileStatus.getPath()); + } + + /** + * Indicate whether to use shouldCopy(fileStatus) or use shouldCopy(path) + * The default behaviour is to use shouldCopy(path). + * + * @return true, if call shouldCopy(fileStatus), or false. + */ + public boolean supportFileStatus(){ + return false; + } + /** * Public factory method which returns the appropriate implementation of * CopyFilter. diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DirCopyFilter.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DirCopyFilter.java new file mode 100644 index 0000000000000..92c28ae658464 --- /dev/null +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DirCopyFilter.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.tools; + +import java.io.IOException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +/** + * A CopyFilter which checks if the FileStatus is a file or directory, the directory + * will be kept and the file will be filtered out. + */ +public class DirCopyFilter extends FileStatusCopyFilter { + private static final Logger LOG = LoggerFactory.getLogger(DirCopyFilter.class); + private Configuration conf; + + /** + * Constructor of DirCopyFilter, it can be instantiated by CopyFilter#getCopyFilter method. + * @param conf Configuration. + */ + public DirCopyFilter(Configuration conf) { + this.conf = conf; + } + + @Override + public boolean shouldCopy(Path path) { + try { + FileSystem fs = path.getFileSystem(this.conf); + if (fs.getFileStatus(path).isDirectory()) { + return true; + } + } catch (IOException e) { + throw new RuntimeException("Exception occurred when get FileSystem or get FileStatus", e); + } + + LOG.debug("Skipping {} as it is not a directory", path); + return false; + } + + @Override + public boolean shouldCopy(CopyListingFileStatus fileStatus) { + return fileStatus.isDirectory(); + } +} diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java index b05ce10a96f9c..3336a3629dd6a 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java @@ -92,7 +92,7 @@ private DistCpConstants() { /* Total bytes to be copied. Updated by copylisting. Unfiltered count */ public static final String CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED = "mapred.total.bytes.expected"; - /* Total number of paths to copy, includes directories. Unfiltered count */ + /* Total number of paths to be copied, includes directories. */ public static final String CONF_LABEL_TOTAL_NUMBER_OF_RECORDS = "mapred.number.of.records"; /* If input is based -f <>, file containing the src paths */ @@ -185,7 +185,7 @@ private DistCpConstants() { public static final int SPLIT_RATIO_DEFAULT = 2; /** - * Constants for NONE file deletion + * Constants for NONE file deletion. */ public static final String NONE_PATH_NAME = "/NONE"; public static final Path NONE_PATH = new Path(NONE_PATH_NAME); diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/FileStatusCopyFilter.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/FileStatusCopyFilter.java new file mode 100644 index 0000000000000..1806f0fc11924 --- /dev/null +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/FileStatusCopyFilter.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.tools; + +/** + * The default implement of FileStatus CopyFilter + * + * Each CopyFilter class likes to use shouldCopy(fileStatus) should be Subclass + * of this class. + * + */ +public abstract class FileStatusCopyFilter extends CopyFilter{ + + /** + * Always return true for FileStatusCopyFilter and its subsequent class + * to enable shouldCopy(fileStatus). + * @return return - for scan file status mode, always return true. + */ + @Override + public boolean supportFileStatus() { + return true; + } +} diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java index 3cb00e0a8d79a..2e0e271c566e2 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java @@ -517,6 +517,20 @@ protected boolean shouldCopy(Path path) { return copyFilter.shouldCopy(path); } + /** + * Provide another option to skip copy of a path, allows for exclusion of files such as + * {@link org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter#SUCCEEDED_FILE_NAME} + * + * shouldCopy(path) and shouldCopy(fileStatus) are mutually exclusive. In other words, + * you should use shouldCopy(path) or shouldCopy(fileStatus), but not both. + * + * @param fileStatus - FileStatus being considered for copy while building the file listing + * @return - True if the fileStatus should be considered for copy, false otherwise + */ + protected boolean shouldCopy(CopyListingFileStatus fileStatus){ + return copyFilter.shouldCopy(fileStatus); + } + /** {@inheritDoc} */ @Override protected long getBytesToCopy() { @@ -662,7 +676,13 @@ private void writeToFileListing(SequenceFile.Writer fileListWriter, DistCpUtils.getRelativePath(sourcePathRoot, fileStatus.getPath()), fileStatus.getPath()); - if (!shouldCopy(fileStatus.getPath())) { + // check if copyFilter to use shouldCopy(fileStatus) or shouldCopy(path) + // if true, use shouldCopy(fileStatus) or else use shouldCopy(path) + if(copyFilter.supportFileStatus()){ + if(!shouldCopy(fileStatus)){ + return; + } + }else if (!shouldCopy(fileStatus.getPath())) { return; } diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/UniformRecordInputFormat.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/UniformRecordInputFormat.java new file mode 100644 index 0000000000000..c91d30d607839 --- /dev/null +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/UniformRecordInputFormat.java @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.tools.mapred; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader; +import org.apache.hadoop.tools.CopyListingFileStatus; +import org.apache.hadoop.tools.DistCpConstants; +import org.apache.hadoop.tools.util.DistCpUtils; +import org.apache.hadoop.util.Preconditions; + +/** + * UniformRecordInputFormat extends the InputFormat class, to produce + * record-splits for DistCp. + * It looks at the copy-listing and groups the contents into record-splits such + * that the total-records to be copied for each input split is uniform. + */ +public class UniformRecordInputFormat extends InputFormat { + private static final Logger LOG = LoggerFactory.getLogger(UniformRecordInputFormat.class); + + /** + * Implementation of InputFormat::getSplits(). Returns a list of InputSplits, + * such that the number of records to be copied for all the splits are + * approximately equal. + * @param context JobContext for the job. + * @return The list of uniformly-distributed input-splits. + * @throws IOException Exception Reading split file + * @throws InterruptedException Thread interrupted exception + */ + @Override + public List getSplits(JobContext context) throws IOException, InterruptedException { + Configuration conf = context.getConfiguration(); + int numSplits = getNumSplits(conf); + if (numSplits == 0){ + return new ArrayList(); + } + + return createSplits(conf, numSplits, getNumberOfRecords(conf)); + } + + private List createSplits(Configuration configuration, int numSplits, long numRecords) + throws IOException { + List splits = new ArrayList(numSplits); + long nRecordsPerSplit = (long) Math.floor(numRecords * 1.0 / numSplits); + if (LOG.isDebugEnabled()) { + LOG.debug("Average records per map: " + nRecordsPerSplit + + ", Number of maps: " + numSplits + ", total records: " + numRecords); + } + + Path listingFilePath = getListingFilePath(configuration); + CopyListingFileStatus srcFileStatus = new CopyListingFileStatus(); + Text srcRelPath = new Text(); + long lastPosition = 0L; + long count = 0L; + long remains = numRecords - nRecordsPerSplit * (long) numSplits; + + SequenceFile.Reader reader = null; + try { + reader = getListingFileReader(configuration); + while (reader.next(srcRelPath, srcFileStatus)) { + count++; + + // a split's size must be nRecordsPerSplit or (nRecordsPerSplit + 1) + // the first N (num of remains) splits have a size of (nRecordsPerSplit + 1), + // the others have a size of nRecordsPerSplit + if ((remains > 0 && count % (nRecordsPerSplit + 1) == 0) || + (remains == 0 && count % nRecordsPerSplit == 0)) { + + long currentPosition = reader.getPosition(); + FileSplit split = new FileSplit(listingFilePath, lastPosition, + currentPosition - lastPosition, null); + if (LOG.isDebugEnabled()) { + LOG.debug("Creating split: " + split + ", records in split: " + count); + } + + splits.add(split); + lastPosition = currentPosition; + if (remains > 0) { + remains--; + } + count = 0L; + } + } + + return splits; + } finally { + IOUtils.closeStream(reader); + } + } + + /** + * Implementation of InputFormat::createRecordReader(). + * @param split The split for which the RecordReader is sought. + * @param context The context of the current task-attempt. + * @return A SequenceFileRecordReader instance, (since the copy-listing is a + * simple sequence-file.) + * @throws IOException Exception Reading split file + * @throws InterruptedException Thread interrupted exception + */ + public RecordReader createRecordReader( + InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + return new SequenceFileRecordReader(); + } + + private static Path getListingFilePath(Configuration configuration) { + String listingFilePathString = + configuration.get(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH, ""); + + Preconditions.checkState(!listingFilePathString.equals(""), + "Listing file doesn't exist at %s", listingFilePathString); + return new Path(listingFilePathString); + } + + private SequenceFile.Reader getListingFileReader(Configuration conf) { + Path listingFilePath = getListingFilePath(conf); + + try { + FileSystem fs = listingFilePath.getFileSystem(conf); + Preconditions.checkState(fs.exists(listingFilePath), + "Listing file doesn't exist at: %s", listingFilePath); + + return new SequenceFile.Reader(conf, + SequenceFile.Reader.file(listingFilePath)); + } catch (IOException | IllegalStateException exception) { + LOG.error("Couldn't read listing file at: {}", listingFilePath, exception); + throw new IllegalStateException("Couldn't read listing file at: " + + listingFilePath, exception); + } + } + + private static long getNumberOfRecords(Configuration configuration) { + return DistCpUtils.getLong(configuration, + DistCpConstants.CONF_LABEL_TOTAL_NUMBER_OF_RECORDS); + } + + private static int getNumSplits(Configuration configuration) { + return DistCpUtils.getInt(configuration, MRJobConfig.NUM_MAPS); + } +} diff --git a/hadoop-tools/hadoop-distcp/src/main/resources/distcp-default.xml b/hadoop-tools/hadoop-distcp/src/main/resources/distcp-default.xml index 6e1154ef1a9dd..9e7d1416fd1e7 100644 --- a/hadoop-tools/hadoop-distcp/src/main/resources/distcp-default.xml +++ b/hadoop-tools/hadoop-distcp/src/main/resources/distcp-default.xml @@ -19,6 +19,12 @@ + + distcp.record.strategy.impl + org.apache.hadoop.tools.mapred.UniformRecordInputFormat + Implementation of record input format + + distcp.dynamic.strategy.impl org.apache.hadoop.tools.mapred.lib.DynamicInputFormat diff --git a/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm b/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm index 0e11b24529bde..fa03be75e62c1 100644 --- a/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm +++ b/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm @@ -350,7 +350,7 @@ Command Line Options | `-filelimit ` | Limit the total number of files to be <= n | **Deprecated!** Ignored in the new DistCp. | | `-sizelimit ` | Limit the total size to be <= n bytes | **Deprecated!** Ignored in the new DistCp. | | `-delete` | Delete the files existing in the dst but not in src | The deletion is done by FS Shell. So the trash will be used, if it is enable. Delete is applicable only with update or overwrite options. | -| `-strategy {dynamic|uniformsize}` | Choose the copy-strategy to be used in DistCp. | By default, uniformsize is used. (i.e. Maps are balanced on the total size of files copied by each map. Similar to legacy.) If "dynamic" is specified, `DynamicInputFormat` is used instead. (This is described in the Architecture section, under InputFormats.) | +| `-strategy {dynamic\|uniformsize\|uniformrecord}` | Choose the copy-strategy to be used in DistCp. | By default, uniformsize is used. (i.e. Maps are balanced on the total size of files copied by each map. Similar to legacy.) If uniformrecord is used, Maps are balanced on the total number of files copied by each map, this is especially useful when there are lots of empty directories. If "dynamic" is specified, `DynamicInputFormat` is used instead. (This is described in the Architecture section, under InputFormats.) | | `-bandwidth` | Specify bandwidth per map, in MB/second. | Each map will be restricted to consume only the specified bandwidth. This is not always exact. The map throttles back its bandwidth consumption during a copy, such that the **net** bandwidth used tends towards the specified value. | | `-atomic {-tmp }` | Specify atomic commit, with optional tmp directory. | `-atomic` instructs DistCp to copy the source data to a temporary target location, and then move the temporary target to the final-location atomically. Data will either be available at final target in a complete and consistent form, or not at all. Optionally, `-tmp` may be used to specify the location of the tmp-target. If not specified, a default is chosen. **Note:** tmp_dir must be on the final target cluster. | | `-async` | Run DistCp asynchronously. Quits as soon as the Hadoop Job is launched. | The Hadoop Job-id is logged, for tracking. | @@ -456,7 +456,14 @@ $H3 Copy-listing Generator `distcp.exclude-file-regex` parameter in "DistCpOptions". Support regular expressions specified by java.util.regex.Pattern. This is a more dynamic approach as compared to "RegexCopyFilter". - 3. `distcp.filters.class` to "TrueCopyFilter". This is used as a default + 3. `distcp.filters.class` to "DirCopyFilter". If you are using this + implementation, you will filter out all files and only directories remains. + What's more, you can overwrite shouldCopy(CopyListingFileStatus fileStatus) method, + which gives you a more fluent way to filter files/dirs by their file statuses. + (i.e. A file status contains a lot of useful info such as file path, file length, + isDir, modificationTime, owner, etc. You can use them in your own way + by extends FileStatusCopyFilter class) + 4. `distcp.filters.class` to "TrueCopyFilter". This is used as a default implementation if none of the above options are specified. The legacy implementation only lists those paths that must definitely be @@ -486,6 +493,16 @@ $H3 InputFormats and MapReduce Components other map. The splitting isn't always perfect, but its trivial implementation keeps the setup-time low. + * **UniformRecordInputFormat:** + This implementation of org.apache.hadoop.mapreduce.InputFormat provides + equivalence with Legacy DistCp in balancing load across maps too. The aim of + the UniformRecordInputFormat is to make each map copy roughly the same number + of file/directories. Apropos, the listing file is split into groups of paths, such + that the sum of file/directories number in each InputSplit is nearly equal to every + other map. The splitting isn't always perfect, but its trivial + implementation keeps the setup-time low. This implementation is especially useful + when there are lots of empty directories to be copied. + * **DynamicInputFormat and DynamicRecordReader:** The DynamicInputFormat implements `org.apache.hadoop.mapreduce.InputFormat`, and is new to DistCp. The listing-file is split into several "chunk-files", diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyFilter.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyFilter.java index 4d36f3840612c..384631cfc0fad 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyFilter.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyFilter.java @@ -58,6 +58,19 @@ public void testGetCopyFilterRegexpInConfigurationFilter() { copyFilter instanceof RegexpInConfigurationFilter); } + @Test + public void testGetDirCopyFilter(){ + final String filterName = + "org.apache.hadoop.tools.DirCopyFilter"; + Configuration configuration = new Configuration(false); + configuration.set(DistCpConstants.CONF_LABEL_FILTERS_CLASS, filterName); + CopyFilter copyFilter = CopyFilter.getCopyFilter(configuration); + assertTrue("copyFilter should be instance of DirCopyFilter", + copyFilter instanceof DirCopyFilter); + assertTrue("copyFilter should be instance of FileStatusCopyFilter too", + copyFilter instanceof FileStatusCopyFilter); + } + @Test public void testGetCopyFilterNonExistingClass() throws Exception { final String filterName = diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDirCopyFilter.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDirCopyFilter.java new file mode 100644 index 0000000000000..98b6f79e3d56b --- /dev/null +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDirCopyFilter.java @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.tools; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.io.IOUtils; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Random; + +import static org.assertj.core.api.Assertions.assertThat; + +public class TestDirCopyFilter { + + private static final String SRCROOT = "/src"; + private static final long BLOCK_SIZE = 1024; + private static MiniDFSCluster cluster; + private static Configuration conf; + private static FileSystem fs; + public static final Path DIR1 = new Path(SRCROOT, "dir1"); + public static final Path FILE0 = new Path(SRCROOT, "file0"); + public static final Path DIR1_FILE1 = new Path(DIR1, "file1"); + public static final Path DIR1_FILE2 = new Path(DIR1, "file2"); + public static final Path DIR1_DIR3 = new Path(DIR1, "dir3"); + public static final Path DIR1_DIR3_DIR4 = new Path(DIR1_DIR3, "dir4"); + public static final Path DIR1_DIR3_DIR4_FILE_3 = new Path(DIR1_DIR3_DIR4, "file1"); + public static final Path[] FILE_PATHS = new Path[]{FILE0, DIR1_FILE1, + DIR1_FILE2, DIR1_DIR3_DIR4_FILE_3}; + public static final Path[] DIR_PATHS = new Path[]{DIR1, DIR1_DIR3, DIR1_DIR3_DIR4}; + @BeforeClass + public static void setupClass() throws IOException { + conf = new Configuration(); + conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, BLOCK_SIZE); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); + cluster.waitActive(); + fs = cluster.getFileSystem(); + + for(Path dirPath : DIR_PATHS){ + createDir(dirPath); + } + + for(Path filePath : FILE_PATHS){ + createFile(filePath); + } + } + + @AfterClass + public static void teardownClass() throws IOException { + if (fs != null){ + fs.close(); + } + if (cluster != null) { + cluster.shutdown(); + } + } + + + @Test + public void testSupportFileStatus() { + Configuration configuration = new Configuration(false); + DirCopyFilter dirCopyFilter = new DirCopyFilter(configuration); + assertThat(dirCopyFilter.supportFileStatus()).isTrue(); + } + + + @Test + public void testShouldCopyTrue() throws IOException { + DirCopyFilter copyFilter = new DirCopyFilter(conf); + Path[] dirPaths = new Path[]{DIR1, DIR1_DIR3, DIR1_DIR3_DIR4}; + + for (Path dirPath : dirPaths) { + assertThat(copyFilter.shouldCopy(dirStatus(dirPath))) + .describedAs("should copy dir: " + dirPath) + .isTrue(); + assertThat(copyFilter.shouldCopy(dirStatus(dirPath).getPath())) + .describedAs("should copy dir: " + dirPath) + .isTrue(); + } + } + + @Test + public void testShouldCopyFalse() { + DirCopyFilter copyFilter = new DirCopyFilter(conf); + + + for (Path filePath : FILE_PATHS) { + assertThat(copyFilter.shouldCopy(fileStatus(filePath))) + .describedAs("should copy file: " + filePath) + .isFalse(); + assertThat(copyFilter.shouldCopy(filePath)) + .describedAs("should copy file: " + filePath) + .isFalse(); + } + } + + + private CopyListingFileStatus newStatus(final Path path, + final boolean isDir) { + return new CopyListingFileStatus(new FileStatus(0, isDir, 0, 0, 0, path)); + } + + private CopyListingFileStatus dirStatus(final Path path) throws IOException { + CopyListingFileStatus dirFileStatus = newStatus(path, true); + return dirFileStatus; + } + + private CopyListingFileStatus fileStatus(final Path path) { + CopyListingFileStatus fileStatus = newStatus(path, false); + return fileStatus; + } + + private static void createFile(Path filePath) throws IOException { + fs.createFile(filePath); + Random random = new Random(); + + DataOutputStream outputStream = null; + try { + outputStream = fs.create(filePath, true, 0); + outputStream.write(new byte[random.nextInt((int)BLOCK_SIZE * 2)]); + } finally { + IOUtils.cleanupWithLogger(null, outputStream); + } + } + + private static void createDir(Path fileStatus) throws IOException { + fs.mkdirs(fileStatus); + } + +} diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpOptions.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpOptions.java index d126bfdc4f975..badee8a8a19de 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpOptions.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpOptions.java @@ -305,6 +305,8 @@ public void testCopyStrategy() { builder.build().getCopyStrategy()); builder.withCopyStrategy("dynamic"); Assert.assertEquals("dynamic", builder.build().getCopyStrategy()); + builder.withCopyStrategy("record"); + Assert.assertEquals("record", builder.build().getCopyStrategy()); } @Test diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java index 1ffdd89073dec..1beabb2dd6ced 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java @@ -393,6 +393,14 @@ public void testCopyStrategy() { "hdfs://localhost:8020/target/"}); assertThat(options.getCopyStrategy()).isEqualTo("dynamic"); + options = OptionsParser.parse(new String[] { + "-strategy", + "record", + "-f", + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + assertThat(options.getCopyStrategy()).isEqualTo("record"); + options = OptionsParser.parse(new String[] { "-f", "hdfs://localhost:8020/source/first", diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestUniformRecordInputFormat.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestUniformRecordInputFormat.java new file mode 100644 index 0000000000000..886330be7d132 --- /dev/null +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestUniformRecordInputFormat.java @@ -0,0 +1,201 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.tools.mapred; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.mapreduce.task.JobContextImpl; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.test.AbstractHadoopTestBase; +import org.apache.hadoop.tools.CopyListing; +import org.apache.hadoop.tools.CopyListingFileStatus; +import org.apache.hadoop.tools.DistCpConstants; +import org.apache.hadoop.tools.DistCpContext; +import org.apache.hadoop.tools.DistCpOptions; +import org.apache.hadoop.tools.StubContext; +import org.apache.hadoop.tools.util.DistCpUtils; + +import static org.assertj.core.api.Assertions.assertThat; + + +public class TestUniformRecordInputFormat extends AbstractHadoopTestBase { + private static MiniDFSCluster cluster; + private static final int N_FILES = 20; + private static final int SIZEOF_EACH_FILE = 1024; + private static final Random random = new Random(); + private static int totalFileSize = 0; + + private static final Credentials CREDENTIALS = new Credentials(); + + + @BeforeClass + public static void setupClass() throws Exception { + cluster = new MiniDFSCluster.Builder(new Configuration()) + .numDataNodes(1) + .format(true).build(); + totalFileSize = 0; + + for (int i = 0; i < N_FILES; ++i) + totalFileSize += createFile("/tmp/source/" + String.valueOf(i), SIZEOF_EACH_FILE); + } + + private static DistCpOptions getOptions(int nMaps) throws Exception { + Path sourcePath = new Path(cluster.getFileSystem().getUri().toString() + + "/tmp/source"); + Path targetPath = new Path(cluster.getFileSystem().getUri().toString() + + "/tmp/target"); + + List sourceList = new ArrayList(); + sourceList.add(sourcePath); + return new DistCpOptions.Builder(sourceList, targetPath) + .maxMaps(nMaps) + .build(); + } + + private static int createFile(String path, int fileSize) throws Exception { + FileSystem fileSystem = null; + DataOutputStream outputStream = null; + try { + fileSystem = cluster.getFileSystem(); + outputStream = fileSystem.create(new Path(path), true, 0); + int size = (int) Math.ceil(fileSize + (1 - random.nextFloat()) * fileSize); + outputStream.write(new byte[size]); + return size; + } finally { + IOUtils.cleanupWithLogger(null, outputStream); + } + } + + @AfterClass + public static void tearDownClass() { + if (cluster != null) { + cluster.shutdown(); + } + } + + public void testGetSplits(int nMaps) throws Exception { + DistCpContext context = new DistCpContext(getOptions(nMaps)); + Configuration configuration = new Configuration(); + configuration.set(MRJobConfig.NUM_MAPS, String.valueOf(context.getMaxMaps())); + Path listFile = new Path(cluster.getFileSystem().getUri().toString() + + "/tmp/testGetSplits_2/fileList.seq"); + CopyListing.getCopyListing(configuration, CREDENTIALS, context) + .buildListing(listFile, context); + + JobContext jobContext = new JobContextImpl(configuration, new JobID()); + UniformRecordInputFormat uniformRecordInputFormat = new UniformRecordInputFormat(); + List splits + = uniformRecordInputFormat.getSplits(jobContext); + + long totalRecords = DistCpUtils.getLong(configuration, DistCpConstants.CONF_LABEL_TOTAL_NUMBER_OF_RECORDS); + long recordPerMap = totalRecords / nMaps; + + + checkSplits(listFile, splits); + + int doubleCheckedTotalSize = 0; + for (int i = 0; i < splits.size(); ++i) { + InputSplit split = splits.get(i); + int currentSplitSize = 0; + RecordReader recordReader = + uniformRecordInputFormat.createRecordReader(split, null); + StubContext stubContext = new StubContext(jobContext.getConfiguration(), + recordReader, 0); + final TaskAttemptContext taskAttemptContext + = stubContext.getContext(); + recordReader.initialize(split, taskAttemptContext); + long recordCnt = 0; + while (recordReader.nextKeyValue()) { + recordCnt++; + Path sourcePath = recordReader.getCurrentValue().getPath(); + FileSystem fs = sourcePath.getFileSystem(configuration); + FileStatus fileStatus[] = fs.listStatus(sourcePath); + // If the fileStatus is a file which fileStatus[] must be 1, + // then add the size of file, otherwise it is a directory skip. + // For all files under the directory will be in the sequence file too. + if (fileStatus.length > 1) { + continue; + } + currentSplitSize += fileStatus[0].getLen(); + } + + assertThat(recordCnt) + .describedAs("record count") + .isGreaterThanOrEqualTo(recordPerMap) + .isLessThanOrEqualTo(recordPerMap + 1); + doubleCheckedTotalSize += currentSplitSize; + } + + assertThat(totalFileSize).isEqualTo(doubleCheckedTotalSize); + } + + private void checkSplits(Path listFile, List splits) throws IOException { + long lastEnd = 0; + + //Verify if each split's start is matching with the previous end and + //we are not missing anything + for (InputSplit split : splits) { + FileSplit fileSplit = (FileSplit) split; + long start = fileSplit.getStart(); + assertThat(lastEnd) + .isEqualTo(start) + .withFailMessage("The end of last file is not equals to the begin of current file."); + lastEnd = start + fileSplit.getLength(); + } + + //Verify there is nothing more to read from the input file + try (SequenceFile.Reader reader + = new SequenceFile.Reader(cluster.getFileSystem().getConf(), + SequenceFile.Reader.file(listFile))) { + reader.seek(lastEnd); + CopyListingFileStatus srcFileStatus = new CopyListingFileStatus(); + Text srcRelPath = new Text(); + assertThat(reader.next(srcRelPath, srcFileStatus)) + .isFalse() + .withFailMessage("The reader is expected to reach end of file, but it doesn't."); + } + } + + @Test + public void testGetSplits() throws Exception { + for (int i = 1; i < N_FILES; ++i) + testGetSplits(i); + } +} diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestUniformSizeInputFormat.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestUniformSizeInputFormat.java index 622e3916799eb..7f14b66f9b8f0 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestUniformSizeInputFormat.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestUniformSizeInputFormat.java @@ -51,7 +51,7 @@ public class TestUniformSizeInputFormat { private static MiniDFSCluster cluster; private static final int N_FILES = 20; private static final int SIZEOF_EACH_FILE=1024; - private static final Random random = new Random(); + private static final Random RANDOM_VALUE = new Random(); private static int totalFileSize = 0; private static final Credentials CREDENTIALS = new Credentials(); @@ -63,8 +63,10 @@ public static void setup() throws Exception { .format(true).build(); totalFileSize = 0; - for (int i=0; i 1) { continue; }