Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,9 @@ private DistCpConstants() {
public static final String CONF_LABEL_USE_ITERATOR = "distcp.use.iterator";

/**
* Enabling distcp -update to use modification time of source and target
* file to check while copying same file with same size but different content.
* Enabling {@code distcp -update} to use modification time of source and
* target file to check while copying same file with same size but
* different content.
*
* The check would verify if the target file is perceived as older than the
* source then it indicates that the source has been recently updated and it
Expand All @@ -155,6 +156,12 @@ private DistCpConstants() {
public static final String CONF_LABEL_UPDATE_MOD_TIME =
"distcp.update.modification.time";

/**
* Default value for 'distcp.update.modification.time' configuration.
*/
public static final boolean CONF_LABEL_UPDATE_MOD_TIME_DEFAULT =
true;

/**
* Constants for DistCp return code to shell / consumer of ToolRunner's run
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import org.apache.hadoop.tools.util.DistCpUtils;
import org.apache.hadoop.util.StringUtils;

import static org.apache.hadoop.tools.DistCpConstants.CONF_LABEL_UPDATE_MOD_TIME_DEFAULT;

/**
* Mapper class that executes the DistCp copy operation.
* Implements the o.a.h.mapreduce.Mapper interface.
Expand Down Expand Up @@ -74,6 +76,15 @@ static enum FileAction {
OVERWRITE, // Overwrite the whole file
}

/**
* Indicates the checksum comparison result.
*/
public enum ChecksumComparison {
TRUE, // checksum comparison is compatible and true.
FALSE, // checksum comparison is compatible and false.
INCOMPATIBLE, // checksum comparison is not compatible.
}

private static Logger LOG = LoggerFactory.getLogger(CopyMapper.class);

private Configuration conf;
Expand All @@ -85,7 +96,7 @@ static enum FileAction {
private boolean append = false;
private boolean verboseLog = false;
private boolean directWrite = false;
private boolean useModTimeToUpdate = true;
private boolean useModTimeToUpdate;
private EnumSet<FileAttribute> preserve = EnumSet.noneOf(FileAttribute.class);

private FileSystem targetFS = null;
Expand Down Expand Up @@ -116,7 +127,8 @@ public void setup(Context context) throws IOException, InterruptedException {
directWrite = conf.getBoolean(
DistCpOptionSwitch.DIRECT_WRITE.getConfigLabel(), false);
useModTimeToUpdate =
conf.getBoolean(DistCpConstants.CONF_LABEL_UPDATE_MOD_TIME, true);
conf.getBoolean(DistCpConstants.CONF_LABEL_UPDATE_MOD_TIME,
CONF_LABEL_UPDATE_MOD_TIME_DEFAULT);

targetWorkPath = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
Path targetFinalPath = new Path(conf.get(
Expand Down Expand Up @@ -361,31 +373,60 @@ private boolean canSkip(FileSystem sourceFS, CopyListingFileStatus source,
if (sameLength && source.getLen() == 0) {
return true;
}
// if both the source and target have the same length, then check if the
// config to use modification time is set to true, then use the
// modification time and checksum comparison to determine if the copy can
// be skipped else if not set then just use the checksum comparison to
// check copy skip.
// If the src and target file have same size and block size, we would
// check if the checkCrc flag is enabled or not. If enabled, and the
// modTime comparison is enabled then return true if target file is older
// than the source file, since this indicates that the target file is
// recently updated and the source is not changed more recently than the
// update, we can skip the copy else we would copy.
// If skipCrc flag is disabled, we would check the checksum comparison
// which is an enum representing 3 values, of which if the comparison
// returns NOT_COMPATIBLE, we'll try to check modtime again, else return
// the result of checksum comparison which are compatible(true or false).
//
// Note: Different object stores can have different checksum algorithms
// resulting in no checksum comparison that results in return true
// always, having the modification time enabled can help in these
// scenarios to not incorrectly skip a copy. Refer: HADOOP-18596.

if (sameLength && sameBlockSize) {
if (useModTimeToUpdate) {
return
(source.getModificationTime() < target.getModificationTime()) &&
(skipCrc || DistCpUtils.checksumsAreEqual(sourceFS,
source.getPath(), null,
targetFS, target.getPath(), source.getLen()));
if (skipCrc) {
return maybeUseModTimeToCompare(source, target);
} else {
return skipCrc || DistCpUtils
ChecksumComparison checksumComparison = DistCpUtils
.checksumsAreEqual(sourceFS, source.getPath(), null,
targetFS, target.getPath(), source.getLen());
LOG.debug("Result of checksum comparison between src {} and target "
+ "{} : {}", source, target, checksumComparison);
if (checksumComparison.equals(ChecksumComparison.INCOMPATIBLE)) {
return maybeUseModTimeToCompare(source, target);
}
// if skipCrc is disabled and checksumComparison is compatible we
// need not check the mod time.
return checksumComparison.equals(ChecksumComparison.TRUE);
}
} else {
return false;
}
return false;
}

/**
* If the mod time comparison is enabled, check the mod time else return
* false.
* Comparison: If the target file perceives to have greater mod time(older)
* than the source file, we can assume that there has been no new changes
* that occurred in the source file, hence we should return true to skip the
* copy of the file.
* @param source Source fileStatus.
* @param target Target fileStatus.
* @return boolean representing result of modTime check.
*/
private boolean maybeUseModTimeToCompare(
CopyListingFileStatus source, FileStatus target) {
if (useModTimeToUpdate) {
return source.getModificationTime() < target.getModificationTime();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be <= ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, good point.

just thinking if there would ever be a scenario when the source file is updated at the same time as it is synced to a different store, so we can have "=" to skip the copy...

}
// if we cannot check mod time, return true (skip the copy).
return true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.hadoop.tools.CopyListingFileStatus;
import org.apache.hadoop.tools.DistCpContext;
import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
import org.apache.hadoop.tools.mapred.CopyMapper;
import org.apache.hadoop.tools.mapred.UniformSizeInputFormat;
import org.apache.hadoop.util.StringUtils;

Expand Down Expand Up @@ -568,10 +569,12 @@ public static String getStringDescriptionFor(long nBytes) {
* and false otherwise.
* @throws IOException if there's an exception while retrieving checksums.
*/
public static boolean checksumsAreEqual(FileSystem sourceFS, Path source,
FileChecksum sourceChecksum,
FileSystem targetFS,
Path target, long sourceLen)
public static CopyMapper.ChecksumComparison checksumsAreEqual(
FileSystem sourceFS,
Path source,
FileChecksum sourceChecksum,
FileSystem targetFS,
Path target, long sourceLen)
throws IOException {
FileChecksum targetChecksum = null;
try {
Expand All @@ -585,8 +588,15 @@ public static boolean checksumsAreEqual(FileSystem sourceFS, Path source,
} catch (IOException e) {
LOG.error("Unable to retrieve checksum for " + source + " or " + target, e);
}
return (sourceChecksum == null || targetChecksum == null ||
sourceChecksum.equals(targetChecksum));
// If the source or target checksum is null, that means there is no
// comparison that took place and return not compatible.
// else if matched, return compatible with the matched result.
if (sourceChecksum == null || targetChecksum == null) {
return CopyMapper.ChecksumComparison.INCOMPATIBLE;
} else if (sourceChecksum.equals(targetChecksum)) {
return CopyMapper.ChecksumComparison.TRUE;
}
return CopyMapper.ChecksumComparison.FALSE;
}

/**
Expand All @@ -613,8 +623,12 @@ public static void compareFileLengthsAndChecksums(long srcLen,

//At this point, src & dest lengths are same. if length==0, we skip checksum
if ((srcLen != 0) && (!skipCrc)) {
if (!checksumsAreEqual(sourceFS, source, sourceChecksum,
targetFS, target, srcLen)) {
CopyMapper.ChecksumComparison
checksumComparison = checksumsAreEqual(sourceFS, source, sourceChecksum,
targetFS, target, srcLen);
// If Checksum comparison is false set it to false, else set to true.
boolean checksumResult = !checksumComparison.equals(CopyMapper.ChecksumComparison.FALSE);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this outcome right. as L632 should be reached for any outcome other than True.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll be setting "checksumResult" to be true for both "INCOMPATIBLE" and "TRUE" result from checksumsAreEqual() method else false and go through L632, so, we would be following the same flow as before since incompatible result from this method was true earlier too.

if (!checksumResult) {
StringBuilder errorMessage =
new StringBuilder(DistCpConstants.CHECKSUM_MISMATCH_ERROR_MSG)
.append(source).append(" and ").append(target).append(".");
Expand Down
14 changes: 6 additions & 8 deletions hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm
Original file line number Diff line number Diff line change
Expand Up @@ -650,18 +650,16 @@ checksums if the checksum algorithm between the two stores is different.
cluster and that of the object store. Otherwise, changed files may be
missed/copied too often.

* `distcp.update.modification.time` can be used alongside the checksum check
in stores with same checksum algorithm as well. if set to true we check
both modification time and checksum between the files, but if this property
is set to false we only compare the checksum between the files to determine
if we should skip the copy or not.

To turn off, set this in your core-site.xml
* `distcp.update.modification.time` would only be used if either of the two
stores don't have checksum validation resulting in incompatible checksum
comparison between the two. Even if the property is set to true, it won't
be used if their is valid checksum comparison between the two stores.

To turn off the modification time check, set this in your core-site.xml
```xml
<property>
<name>distcp.update.modification.time</name>
<value>true</value>
<value>false</value>
</property>
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ private Job distCpUpdateWithFs(final Path srcDir, final Path destDir,
Collections.singletonList(srcDir), destDir)
.withDeleteMissing(true)
.withSyncFolder(true)
.withSkipCRC(true)
.withSkipCRC(false)
.withDirectWrite(shouldUseDirectWrite())
.withOverwrite(false)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -562,9 +562,11 @@ private void testCommitWithChecksumMismatch(boolean skipCrc)
Path sourcePath = new Path(sourceBase + srcFilename);
CopyListingFileStatus sourceCurrStatus =
new CopyListingFileStatus(fs.getFileStatus(sourcePath));
Assert.assertFalse(DistCpUtils.checksumsAreEqual(
Assert.assertFalse(!DistCpUtils.checksumsAreEqual(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move to assertEquals here

fs, new Path(sourceBase + srcFilename), null,
fs, new Path(targetBase + srcFilename), sourceCurrStatus.getLen()));
fs, new Path(targetBase + srcFilename),
sourceCurrStatus.getLen())
.equals(CopyMapper.ChecksumComparison.FALSE));
} catch(IOException exception) {
if (skipCrc) {
LOG.error("Unexpected exception is found", exception);
Expand Down