Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,14 @@ private boolean canSkip(FileSystem sourceFS, CopyListingFileStatus source,
boolean sameLength = target.getLen() == source.getLen();
boolean sameBlockSize = source.getBlockSize() == target.getBlockSize()
|| !preserve.contains(FileAttribute.BLOCKSIZE);
if (sameLength && sameBlockSize) {
// checksum check to be done if same file len(greater than 0), same block
// size and the target file has been updated more recently than the source
// file.
// Note: For Different cloud stores with different checksum algorithms,
// checksum comparisons are not performed so we would be depending on the
// file size and modification time.
if (sameLength && (source.getLen() > 0) && sameBlockSize &&
source.getModificationTime() < target.getModificationTime()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the addition of the getLen() > 0? We want to always copy if its an empty file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I actually had to add a check of if the file size is 0 to skip it every time before this check, forgot to add it in this version locally 😅. Good catch.

return skipCrc ||
DistCpUtils.checksumsAreEqual(sourceFS, source.getPath(), null,
targetFS, target.getPath(), source.getLen());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,17 @@
import static org.apache.hadoop.tools.DistCpConstants.CONF_LABEL_DISTCP_JOB_ID;

import java.io.IOException;
import java.nio.file.Files;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -72,6 +78,9 @@ public abstract class AbstractContractDistCpTest
private static final Logger LOG =
LoggerFactory.getLogger(AbstractContractDistCpTest.class);

/** Using offset to change modification time in tests. */
private static final long MODIFICATION_TIME_OFFSET = 10000;

public static final String SCALE_TEST_DISTCP_FILE_SIZE_KB
= "scale.test.distcp.file.size.kb";

Expand Down Expand Up @@ -857,4 +866,83 @@ public void testDistCpWithUpdateExistFile() throws Exception {
verifyFileContents(localFS, dest, block);
}

@Test
public void testDistCpUpdateCheckFileSkip() throws Exception {
describe("Distcp update to check file skips.");

Path source = new Path(remoteDir, "file");
Path dest = new Path(localDir, "file");
dest = localFS.makeQualified(dest);

// Creating a source file with certain dataset.
byte[] sourceBlock = dataset(10, 'a', 'z');

// Write the dataset and as well create the target path.
try (FSDataOutputStream out = remoteFS.create(source)) {
out.write(sourceBlock);
localFS.create(dest);
}

verifyPathExists(remoteFS, "", source);
verifyPathExists(localFS, "", dest);
DistCpTestUtils
.assertRunDistCp(DistCpConstants.SUCCESS, remoteDir.toString(),
localDir.toString(), "-delete -update" + getDefaultCLIOptions(),
conf);

// First distcp -update would normally copy the source to dest.
verifyFileContents(localFS, dest, sourceBlock);

// Remove the source file and replace with a file with same name and size
// but different content.
remoteFS.delete(source, false);
Path updatedSource = new Path(remoteDir, "file");
byte[] updatedSourceBlock = dataset(10, 'b', 'z');
try (FSDataOutputStream out = remoteFS.create(updatedSource)) {
out.write(updatedSourceBlock);
}

// For testing purposes we would take the modification time of the
// updated Source file and add an offset or subtract the offset and set
// that time as the modification time for target file, this way we can
// ensure that our test can emulate a scenario where source is either more
// recently changed after -update so that copy takes place or target file
// is more recently changed which would skip the copying since the source
// has not been recently updated.
FileStatus fsSourceUpd = remoteFS.getFileStatus(updatedSource);
long modTimeSourceUpd = fsSourceUpd.getModificationTime();

// Add by an offset which would ensure enough gap for the test to
// not fail due to race conditions.
long newTargetModTimeNew = modTimeSourceUpd + MODIFICATION_TIME_OFFSET;
localFS.setTimes(dest, newTargetModTimeNew, -1);

DistCpTestUtils
.assertRunDistCp(DistCpConstants.SUCCESS, remoteDir.toString(),
localDir.toString(), "-delete -update" + getDefaultCLIOptions(),
conf);

// File contents should remain same since the mod time for target is
// newer than the updatedSource which indicates that the sync happened
// more recently and there is no update.
verifyFileContents(localFS, dest, sourceBlock);

// Subtract by an offset which would ensure enough gap for the test to
// not fail due to race conditions.
long newTargetModTimeOld =
Math.min(modTimeSourceUpd - MODIFICATION_TIME_OFFSET, 0);
localFS.setTimes(dest, newTargetModTimeOld, -1);

DistCpTestUtils
.assertRunDistCp(DistCpConstants.SUCCESS, remoteDir.toString(),
localDir.toString(), "-delete -update" + getDefaultCLIOptions(),
conf);

Assertions.assertThat(RemoteIterators.toList(localFS.listFiles(dest, true)))
.hasSize(1);
// Now the copy should take place and the file contents should change
// since the mod time for target is older than the source file indicating
// that there was an update to the source after the last sync took place.
verifyFileContents(localFS, dest, updatedSourceBlock);
}
}