Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,7 @@ public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs, Cop

private static boolean shouldCopy(FileSystem targetFs, FileStatus fileInSource, FileStatus fileInTarget, OwnerAndPermission replicatedPermission)
throws IOException {
if (fileInSource.isDirectory() || fileInSource.getModificationTime() == fileInTarget.getModificationTime()) {
// if source is dir or source and dst has same version, we compare the permission to determine whether it needs another sync
return !replicatedPermission.hasSameOwnerAndPermission(targetFs, fileInTarget);
}
return fileInSource.getModificationTime() > fileInTarget.getModificationTime();
// Copy only if source is newer than target or if the owner or permission is different
return fileInSource.getModificationTime() > fileInTarget.getModificationTime() || !replicatedPermission.hasSameOwnerAndPermission(targetFs, fileInTarget);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -79,39 +81,113 @@ public void testFindFiles() throws IOException, URISyntaxException {
Properties props = new Properties();
props.setProperty(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, "/");

try (
FileSystem sourceFs = Mockito.mock(FileSystem.class);
try (FileSystem sourceFs = Mockito.mock(FileSystem.class);
FileSystem manifestReadFs = Mockito.mock(FileSystem.class);
FileSystem destFs = Mockito.mock(FileSystem.class);
) {
URI SRC_FS_URI = new URI("source", "the.source.org", "/", null);
URI MANIFEST_READ_FS_URI = new URI("manifest-read", "the.manifest-source.org", "/", null);
URI DEST_FS_URI = new URI("dest", "the.dest.org", "/", null);
Mockito.when(sourceFs.getUri()).thenReturn(SRC_FS_URI);
Mockito.when(manifestReadFs.getUri()).thenReturn(MANIFEST_READ_FS_URI);
Mockito.when(destFs.getUri()).thenReturn(DEST_FS_URI);
Mockito.when(sourceFs.getFileStatus(any(Path.class))).thenReturn(localFs.getFileStatus(new Path(tmpDir.toString())));
Mockito.when(sourceFs.exists(any(Path.class))).thenReturn(true);
Mockito.when(manifestReadFs.exists(any(Path.class))).thenReturn(true);
Mockito.when(manifestReadFs.getFileStatus(manifestPath)).thenReturn(localFs.getFileStatus(manifestPath));
Mockito.when(manifestReadFs.open(manifestPath)).thenReturn(localFs.open(manifestPath));
Mockito.when(destFs.exists(any(Path.class))).thenReturn(false);
Mockito.doAnswer(invocation -> {
Object[] args = invocation.getArguments();
Path path = (Path)args[0];
return localFs.makeQualified(path);
}).when(sourceFs).makeQualified(any(Path.class));
FileSystem destFs = Mockito.mock(FileSystem.class);) {
setSourceAndDestFsMocks(sourceFs, destFs, manifestPath, manifestReadFs);

Iterator<FileSet<CopyEntity>> fileSets =
new ManifestBasedDataset(sourceFs, manifestReadFs, manifestPath, props).getFileSetIterator(destFs, CopyConfiguration.builder(destFs, props).build());
new ManifestBasedDataset(sourceFs, manifestReadFs, manifestPath, props).getFileSetIterator(destFs,
CopyConfiguration.builder(destFs, props).build());
Assert.assertTrue(fileSets.hasNext());
FileSet<CopyEntity> fileSet = fileSets.next();
Assert.assertEquals(fileSet.getFiles().size(), 3); // 2 files to copy + 1 post publish step
Assert.assertTrue(((PostPublishStep)fileSet.getFiles().get(2)).getStep() instanceof SetPermissionCommitStep);
Assert.assertTrue(((PostPublishStep) fileSet.getFiles().get(2)).getStep() instanceof SetPermissionCommitStep);
Mockito.verify(manifestReadFs, Mockito.times(1)).exists(manifestPath);
Mockito.verify(manifestReadFs, Mockito.times(1)).getFileStatus(manifestPath);
Mockito.verify(manifestReadFs, Mockito.times(1)).open(manifestPath);
Mockito.verifyNoMoreInteractions(manifestReadFs);
Mockito.verify(sourceFs, Mockito.times(2)).exists(any(Path.class));
}
}

@Test
public void testFindFilesWithDifferentPermissions() throws IOException, URISyntaxException {

//Get manifest Path
Path manifestPath = new Path(getClass().getClassLoader().getResource("manifestBasedDistcpTest/sampleManifest.json").getPath());
// Test manifestDatasetFinder
Properties props = new Properties();
props.setProperty(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, "/");
props.setProperty("gobblin.copy.preserved.attributes", "rbugpvta");
try (
FileSystem sourceFs = Mockito.mock(FileSystem.class);
FileSystem manifestReadFs = Mockito.mock(FileSystem.class);
FileSystem destFs = Mockito.mock(FileSystem.class)
) {
setSourceAndDestFsMocks(sourceFs, destFs, manifestPath, manifestReadFs);
Mockito.when(destFs.exists(new Path("/tmp/dataset/test1.txt"))).thenReturn(true);
Mockito.when(destFs.exists(new Path("/tmp/dataset/test2.txt"))).thenReturn(false);
Mockito.when(destFs.getFileStatus(any(Path.class))).thenReturn(localFs.getFileStatus(new Path(tmpDir.toString())));

List<AclEntry> aclEntrySource = AclEntry.parseAclSpec("user::rwx,group::rwx,other::rwx", true);
AclStatus aclStatusSource = new AclStatus.Builder().group("group").owner("owner").addEntries(aclEntrySource).build();
Mockito.when(sourceFs.getAclStatus(any(Path.class))).thenReturn(aclStatusSource);
// Specify a different acl for the destination file so that it is recopied even though the modification time is the same
List<AclEntry> aclEntryDest = AclEntry.parseAclSpec("user::rwx,group::rw-,other::r--", true);
AclStatus aclStatusDest = new AclStatus.Builder().group("groupDest").owner("owner").addEntries(aclEntryDest).build();
Mockito.when(destFs.getAclStatus(any(Path.class))).thenReturn(aclStatusDest);

Iterator<FileSet<CopyEntity>> fileSets =
new ManifestBasedDataset(sourceFs, manifestReadFs, manifestPath, props).getFileSetIterator(destFs, CopyConfiguration.builder(destFs, props).build());
Assert.assertTrue(fileSets.hasNext());
FileSet<CopyEntity> fileSet = fileSets.next();
Assert.assertEquals(fileSet.getFiles().size(), 3); // 2 files to copy + 1 post publish step
Assert.assertTrue(((PostPublishStep)fileSet.getFiles().get(2)).getStep() instanceof SetPermissionCommitStep);

}
}

@Test
public void testIgnoreFilesWithSamePermissions() throws IOException, URISyntaxException {
//Get manifest Path
Path manifestPath = new Path(getClass().getClassLoader().getResource("manifestBasedDistcpTest/sampleManifest.json").getPath());
// Test manifestDatasetFinder
Properties props = new Properties();
props.setProperty(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, "/");
props.setProperty("gobblin.copy.preserved.attributes", "rbugpvta");
try (
FileSystem sourceFs = Mockito.mock(FileSystem.class);
FileSystem manifestReadFs = Mockito.mock(FileSystem.class);
FileSystem destFs = Mockito.mock(FileSystem.class)
) {
setSourceAndDestFsMocks(sourceFs, destFs, manifestPath, manifestReadFs);
Mockito.when(destFs.exists(new Path("/tmp/dataset/test1.txt"))).thenReturn(true);
Mockito.when(destFs.exists(new Path("/tmp/dataset/test2.txt"))).thenReturn(true);
Mockito.when(destFs.getFileStatus(any(Path.class))).thenReturn(localFs.getFileStatus(new Path(tmpDir.toString())));

List<AclEntry> aclEntrySource = AclEntry.parseAclSpec("user::rwx,group::rwx,other::rwx", true);
AclStatus aclStatusSource = new AclStatus.Builder().group("group").owner("owner").addEntries(aclEntrySource).build();
Mockito.when(sourceFs.getAclStatus(any(Path.class))).thenReturn(aclStatusSource);
// Same as source acls, files should not be copied
AclStatus aclStatusDest = new AclStatus.Builder().group("groupDest").owner("owner").addEntries(aclEntrySource).build();
Mockito.when(destFs.getAclStatus(any(Path.class))).thenReturn(aclStatusDest);

Iterator<FileSet<CopyEntity>> fileSets =
new ManifestBasedDataset(sourceFs, manifestReadFs, manifestPath, props).getFileSetIterator(destFs, CopyConfiguration.builder(destFs, props).build());
Assert.assertTrue(fileSets.hasNext());
FileSet<CopyEntity> fileSet = fileSets.next();
Assert.assertEquals(fileSet.getFiles().size(), 1); // Post publish step
}
}

private void setSourceAndDestFsMocks(FileSystem sourceFs, FileSystem destFs, Path manifestPath, FileSystem manifestReadFs) throws IOException, URISyntaxException {
URI SRC_FS_URI = new URI("source", "the.source.org", "/", null);
URI MANIFEST_READ_FS_URI = new URI("manifest-read", "the.manifest-source.org", "/", null);
URI DEST_FS_URI = new URI("dest", "the.dest.org", "/", null);
Mockito.when(sourceFs.getUri()).thenReturn(SRC_FS_URI);
Mockito.when(manifestReadFs.getUri()).thenReturn(MANIFEST_READ_FS_URI);
Mockito.when(destFs.getUri()).thenReturn(DEST_FS_URI);
Mockito.when(sourceFs.getFileStatus(any(Path.class))).thenReturn(localFs.getFileStatus(new Path(tmpDir.toString())));
Mockito.when(sourceFs.exists(any(Path.class))).thenReturn(true);
Mockito.when(manifestReadFs.exists(any(Path.class))).thenReturn(true);
Mockito.when(manifestReadFs.getFileStatus(manifestPath)).thenReturn(localFs.getFileStatus(manifestPath));
Mockito.when(manifestReadFs.open(manifestPath)).thenReturn(localFs.open(manifestPath));

Mockito.doAnswer(invocation -> {
Object[] args = invocation.getArguments();
Path path = (Path)args[0];
return localFs.makeQualified(path);
}).when(sourceFs).makeQualified(any(Path.class));
}
}