Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,19 @@ private DistCpConstants() {
"distcp.blocks.per.chunk";

public static final String CONF_LABEL_USE_ITERATOR = "distcp.use.iterator";

/**
* Enabling distcp -update to use modification time of source and target
* file to check while copying same file with same size but different content.
*
* The check would verify if the target file is perceived as older than the
* source then it indicates that the source has been recently updated and it
* is a newer version than what was synced, so we should not skip the copy.
* {@value}
*/
public static final String CONF_LABEL_UPDATE_MOD_TIME =
"distcp.update.modification.time";

/**
* Constants for DistCp return code to shell / consumer of ToolRunner's run
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ static enum FileAction {
private boolean append = false;
private boolean verboseLog = false;
private boolean directWrite = false;
private boolean useModTimeToUpdate = true;
private EnumSet<FileAttribute> preserve = EnumSet.noneOf(FileAttribute.class);

private FileSystem targetFS = null;
Expand Down Expand Up @@ -114,6 +115,8 @@ public void setup(Context context) throws IOException, InterruptedException {
PRESERVE_STATUS.getConfigLabel()));
directWrite = conf.getBoolean(
DistCpOptionSwitch.DIRECT_WRITE.getConfigLabel(), false);
useModTimeToUpdate =
conf.getBoolean(DistCpConstants.CONF_LABEL_UPDATE_MOD_TIME, true);

targetWorkPath = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
Path targetFinalPath = new Path(conf.get(
Expand Down Expand Up @@ -354,10 +357,32 @@ private boolean canSkip(FileSystem sourceFS, CopyListingFileStatus source,
boolean sameLength = target.getLen() == source.getLen();
boolean sameBlockSize = source.getBlockSize() == target.getBlockSize()
|| !preserve.contains(FileAttribute.BLOCKSIZE);
// Skip the copy if a 0 size file is being copied.
if (sameLength && source.getLen() == 0) {
return true;
}
// if both the source and target have the same length, then check if the
// config to use modification time is set to true, then use the
// modification time and checksum comparison to determine if the copy can
// be skipped else if not set then just use the checksum comparison to
// check copy skip.
//
// Note: Different object stores can have different checksum algorithms
// resulting in no checksum comparison that results in return true
// always, having the modification time enabled can help in these
// scenarios to not incorrectly skip a copy. Refer: HADOOP-18596.
if (sameLength && sameBlockSize) {
return skipCrc ||
DistCpUtils.checksumsAreEqual(sourceFS, source.getPath(), null,
targetFS, target.getPath(), source.getLen());
if (useModTimeToUpdate) {
return
(source.getModificationTime() < target.getModificationTime()) &&
(skipCrc || DistCpUtils.checksumsAreEqual(sourceFS,
source.getPath(), null,
targetFS, target.getPath(), source.getLen()));
} else {
return skipCrc || DistCpUtils
.checksumsAreEqual(sourceFS, source.getPath(), null,
targetFS, target.getPath(), source.getLen());
}
} else {
return false;
}
Expand Down
41 changes: 33 additions & 8 deletions hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm
Original file line number Diff line number Diff line change
Expand Up @@ -631,14 +631,39 @@ hadoop distcp -update -numListstatusThreads 20 \
Because object stores are slow to list files, consider setting the `-numListstatusThreads` option when performing a `-update` operation
on a large directory tree (the limit is 40 threads).

When `DistCp -update` is used with object stores,
generally only the modification time and length of the individual files are compared,
not any checksums. The fact that most object stores do have valid timestamps
for directories is irrelevant; only the file timestamps are compared.
However, it is important to have the clock of the client computers close
to that of the infrastructure, so that timestamps are consistent between
the client/HDFS cluster and that of the object store. Otherwise, changed files may be
missed/copied too often.
When `DistCp -update` is used with object stores, generally only the
modification time and length of the individual files are compared, not any
checksums if the checksum algorithm between the two stores is different.

* The `distcp -update` between two object stores with different checksum
algorithm compares the modification times of source and target files along
with the file size to determine whether to skip the file copy. The behavior
is controlled by the property `distcp.update.modification.time`, which is
set to true by default. If the source file is more recently modified than
the target file, it is assumed that the content has changed, and the file
should be updated.
We need to ensure that there is no clock skew between the machines.
The fact that most object stores do have valid timestamps for directories
is irrelevant; only the file timestamps are compared. However, it is
important to have the clock of the client computers close to that of the
infrastructure, so that timestamps are consistent between the client/HDFS
cluster and that of the object store. Otherwise, changed files may be
missed/copied too often.

* `distcp.update.modification.time` can be used alongside the checksum check
in stores with same checksum algorithm as well. if set to true we check
both modification time and checksum between the files, but if this property
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

really? I think if checksums are matching then timestamps shouldn't be compared at all. If two files' checksums match, that is sufficient to say "they are the same"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The timestamps are only used alongside checksums if we have set the config to true, else we would follow the default way that is offered today(So, we can switch off in cases where we know checksums would work).

Since S3A/ABFS has checksums disabled we are returned null for the checksum value, we'll always see true for that case, but it can be true for cases where the checksums actually are identical too, so if we rely on checksum check to be true and then don't compare the timestamp, that can give false skips.

So, should we check the timestamps inside of the checksum check instead? Like if the checksums for both source and target are not null and if we have the property set to true then do the mod time check? This would add few more changes as we would need to change the params inside different classes to pass the config value as well.

We can always have the default value as false and use the property in the cases we want as well to keep the default way as the one offered today too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. and the default option is "don't use checksums". as i was thinking if we would want to have this on automatically if you are on -skipCrc or the formats are incompatible.

but if we leave it something to explicitly ask for, your code looks right

is set to false we only compare the checksum between the files to determine
if we should skip the copy or not.

To turn off, set this in your core-site.xml

```xml
<property>
<name>distcp.update.modification.time</name>
<value>true</value>
</property>
```

**Notes**

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -72,6 +73,9 @@ public abstract class AbstractContractDistCpTest
private static final Logger LOG =
LoggerFactory.getLogger(AbstractContractDistCpTest.class);

/** Using offset to change modification time in tests. */
private static final long MODIFICATION_TIME_OFFSET = 10000;

public static final String SCALE_TEST_DISTCP_FILE_SIZE_KB
= "scale.test.distcp.file.size.kb";

Expand Down Expand Up @@ -354,6 +358,29 @@ private Job distCpUpdate(final Path srcDir, final Path destDir)
.withOverwrite(false)));
}

/**
* Run distcp -update srcDir destDir.
* @param srcDir local source directory
* @param destDir remote destination directory.
* @return the completed job
* @throws Exception any failure.
*/
private Job distCpUpdateWithFs(final Path srcDir, final Path destDir,
FileSystem sourceFs, FileSystem targetFs)
throws Exception {
describe("\nDistcp -update from " + srcDir + " to " + destDir);
lsR("Source Fs to update", sourceFs, srcDir);
lsR("Target Fs before update", targetFs, destDir);
return runDistCp(buildWithStandardOptions(
new DistCpOptions.Builder(
Collections.singletonList(srcDir), destDir)
.withDeleteMissing(true)
.withSyncFolder(true)
.withSkipCRC(true)
.withDirectWrite(shouldUseDirectWrite())
.withOverwrite(false)));
}

/**
* Update the source directories as various tests expect,
* including adding a new file.
Expand Down Expand Up @@ -857,4 +884,122 @@ public void testDistCpWithUpdateExistFile() throws Exception {
verifyFileContents(localFS, dest, block);
}

@Test
public void testDistCpUpdateCheckFileSkip() throws Exception {
describe("Distcp update to check file skips.");

Path source = new Path(remoteDir, "file");
Path dest = new Path(localDir, "file");

Path source0byte = new Path(remoteDir, "file_0byte");
Path dest0byte = new Path(localDir, "file_0byte");
dest = localFS.makeQualified(dest);
dest0byte = localFS.makeQualified(dest0byte);

// Creating a source file with certain dataset.
byte[] sourceBlock = dataset(10, 'a', 'z');

// Write the dataset and as well create the target path.
ContractTestUtils.createFile(localFS, dest, true, sourceBlock);
ContractTestUtils
.writeDataset(remoteFS, source, sourceBlock, sourceBlock.length,
1024, true);

// Create 0 byte source and target files.
ContractTestUtils.createFile(remoteFS, source0byte, true, new byte[0]);
ContractTestUtils.createFile(localFS, dest0byte, true, new byte[0]);

// Execute the distcp -update job.
Job job = distCpUpdateWithFs(remoteDir, localDir, remoteFS, localFS);

// First distcp -update would normally copy the source to dest.
verifyFileContents(localFS, dest, sourceBlock);
// Verify 1 file was skipped in the distcp -update (The 0 byte file).
// Verify 1 file was copied in the distcp -update (The new source file).
verifySkipAndCopyCounter(job, 1, 1);

// Remove the source file and replace with a file with same name and size
// but different content.
remoteFS.delete(source, false);
Path updatedSource = new Path(remoteDir, "file");
byte[] updatedSourceBlock = dataset(10, 'b', 'z');
ContractTestUtils.writeDataset(remoteFS, updatedSource,
updatedSourceBlock, updatedSourceBlock.length, 1024, true);

// For testing purposes we would take the modification time of the
// updated Source file and add an offset or subtract the offset and set
// that time as the modification time for target file, this way we can
// ensure that our test can emulate a scenario where source is either more
// recently changed after -update so that copy takes place or target file
// is more recently changed which would skip the copying since the source
// has not been recently updated.
FileStatus fsSourceUpd = remoteFS.getFileStatus(updatedSource);
long modTimeSourceUpd = fsSourceUpd.getModificationTime();

// Add by an offset which would ensure enough gap for the test to
// not fail due to race conditions.
long newTargetModTimeNew = modTimeSourceUpd + MODIFICATION_TIME_OFFSET;
localFS.setTimes(dest, newTargetModTimeNew, -1);

// Execute the distcp -update job.
Job updatedSourceJobOldSrc =
distCpUpdateWithFs(remoteDir, localDir, remoteFS,
localFS);

// File contents should remain same since the mod time for target is
// newer than the updatedSource which indicates that the sync happened
// more recently and there is no update.
verifyFileContents(localFS, dest, sourceBlock);
// Skipped both 0 byte file and sourceFile (since mod time of target is
// older than the source it is perceived that source is of older version
// and we can skip it's copy).
verifySkipAndCopyCounter(updatedSourceJobOldSrc, 2, 0);

// Subtract by an offset which would ensure enough gap for the test to
// not fail due to race conditions.
long newTargetModTimeOld =
Math.min(modTimeSourceUpd - MODIFICATION_TIME_OFFSET, 0);
localFS.setTimes(dest, newTargetModTimeOld, -1);

// Execute the distcp -update job.
Job updatedSourceJobNewSrc = distCpUpdateWithFs(remoteDir, localDir,
remoteFS,
localFS);

// Verifying the target directory have both 0 byte file and the content
// file.
Assertions
.assertThat(RemoteIterators.toList(localFS.listFiles(localDir, true)))
.hasSize(2);
// Now the copy should take place and the file contents should change
// since the mod time for target is older than the source file indicating
// that there was an update to the source after the last sync took place.
verifyFileContents(localFS, dest, updatedSourceBlock);
// Verifying we skipped the 0 byte file and copied the updated source
// file (since the modification time of the new source is older than the
// target now).
verifySkipAndCopyCounter(updatedSourceJobNewSrc, 1, 1);
}

/**
* Method to check the skipped and copied counters of a distcp job.
*
* @param job job to check.
* @param skipExpectedValue expected skip counter value.
* @param copyExpectedValue expected copy counter value.
* @throws IOException throw in case of failures.
*/
private void verifySkipAndCopyCounter(Job job,
int skipExpectedValue, int copyExpectedValue) throws IOException {
// get the skip and copy counters from the job.
long skipActualValue = job.getCounters()
.findCounter(CopyMapper.Counter.SKIP).getValue();
long copyActualValue = job.getCounters()
.findCounter(CopyMapper.Counter.COPY).getValue();
// Verify if the actual values equals the expected ones.
assertEquals("Mismatch in COPY counter value", copyExpectedValue,
copyActualValue);
assertEquals("Mismatch in SKIP counter value", skipExpectedValue,
skipActualValue);
}
}