diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java index 6969123e58c..b81d5966397 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java @@ -176,10 +176,7 @@ public Iterator> getFileSetIterator(FileSystem targetFs, Cop private static boolean shouldCopy(FileSystem targetFs, FileStatus fileInSource, FileStatus fileInTarget, OwnerAndPermission replicatedPermission) throws IOException { - if (fileInSource.isDirectory() || fileInSource.getModificationTime() == fileInTarget.getModificationTime()) { - // if source is dir or source and dst has same version, we compare the permission to determine whether it needs another sync - return !replicatedPermission.hasSameOwnerAndPermission(targetFs, fileInTarget); - } - return fileInSource.getModificationTime() > fileInTarget.getModificationTime(); + // Copy only if source is newer than target or if the owner or permission is different + return fileInSource.getModificationTime() > fileInTarget.getModificationTime() || !replicatedPermission.hasSameOwnerAndPermission(targetFs, fileInTarget); } } diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/ManifestBasedDatasetFinderTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/ManifestBasedDatasetFinderTest.java index 4fb7db59232..a0dbcacadcb 100644 --- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/ManifestBasedDatasetFinderTest.java +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/ManifestBasedDatasetFinderTest.java @@ -36,6 +36,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.AclEntry; +import org.apache.hadoop.fs.permission.AclStatus; import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.Test; @@ -79,34 +81,18 @@ public void testFindFiles() throws IOException, URISyntaxException { Properties props = new Properties(); props.setProperty(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, "/"); - try ( - FileSystem sourceFs = Mockito.mock(FileSystem.class); + try (FileSystem sourceFs = Mockito.mock(FileSystem.class); FileSystem manifestReadFs = Mockito.mock(FileSystem.class); - FileSystem destFs = Mockito.mock(FileSystem.class); - ) { - URI SRC_FS_URI = new URI("source", "the.source.org", "/", null); - URI MANIFEST_READ_FS_URI = new URI("manifest-read", "the.manifest-source.org", "/", null); - URI DEST_FS_URI = new URI("dest", "the.dest.org", "/", null); - Mockito.when(sourceFs.getUri()).thenReturn(SRC_FS_URI); - Mockito.when(manifestReadFs.getUri()).thenReturn(MANIFEST_READ_FS_URI); - Mockito.when(destFs.getUri()).thenReturn(DEST_FS_URI); - Mockito.when(sourceFs.getFileStatus(any(Path.class))).thenReturn(localFs.getFileStatus(new Path(tmpDir.toString()))); - Mockito.when(sourceFs.exists(any(Path.class))).thenReturn(true); - Mockito.when(manifestReadFs.exists(any(Path.class))).thenReturn(true); - Mockito.when(manifestReadFs.getFileStatus(manifestPath)).thenReturn(localFs.getFileStatus(manifestPath)); - Mockito.when(manifestReadFs.open(manifestPath)).thenReturn(localFs.open(manifestPath)); - Mockito.when(destFs.exists(any(Path.class))).thenReturn(false); - Mockito.doAnswer(invocation -> { - Object[] args = invocation.getArguments(); - Path path = (Path)args[0]; - return localFs.makeQualified(path); - }).when(sourceFs).makeQualified(any(Path.class)); + FileSystem destFs = Mockito.mock(FileSystem.class);) { + setSourceAndDestFsMocks(sourceFs, destFs, manifestPath, manifestReadFs); + Iterator> fileSets = - new ManifestBasedDataset(sourceFs, manifestReadFs, manifestPath, props).getFileSetIterator(destFs, CopyConfiguration.builder(destFs, props).build()); + new ManifestBasedDataset(sourceFs, manifestReadFs, manifestPath, props).getFileSetIterator(destFs, + CopyConfiguration.builder(destFs, props).build()); Assert.assertTrue(fileSets.hasNext()); FileSet fileSet = fileSets.next(); Assert.assertEquals(fileSet.getFiles().size(), 3); // 2 files to copy + 1 post publish step - Assert.assertTrue(((PostPublishStep)fileSet.getFiles().get(2)).getStep() instanceof SetPermissionCommitStep); + Assert.assertTrue(((PostPublishStep) fileSet.getFiles().get(2)).getStep() instanceof SetPermissionCommitStep); Mockito.verify(manifestReadFs, Mockito.times(1)).exists(manifestPath); Mockito.verify(manifestReadFs, Mockito.times(1)).getFileStatus(manifestPath); Mockito.verify(manifestReadFs, Mockito.times(1)).open(manifestPath); @@ -114,4 +100,94 @@ public void testFindFiles() throws IOException, URISyntaxException { Mockito.verify(sourceFs, Mockito.times(2)).exists(any(Path.class)); } } + + @Test + public void testFindFilesWithDifferentPermissions() throws IOException, URISyntaxException { + + //Get manifest Path + Path manifestPath = new Path(getClass().getClassLoader().getResource("manifestBasedDistcpTest/sampleManifest.json").getPath()); + // Test manifestDatasetFinder + Properties props = new Properties(); + props.setProperty(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, "/"); + props.setProperty("gobblin.copy.preserved.attributes", "rbugpvta"); + try ( + FileSystem sourceFs = Mockito.mock(FileSystem.class); + FileSystem manifestReadFs = Mockito.mock(FileSystem.class); + FileSystem destFs = Mockito.mock(FileSystem.class) + ) { + setSourceAndDestFsMocks(sourceFs, destFs, manifestPath, manifestReadFs); + Mockito.when(destFs.exists(new Path("/tmp/dataset/test1.txt"))).thenReturn(true); + Mockito.when(destFs.exists(new Path("/tmp/dataset/test2.txt"))).thenReturn(false); + Mockito.when(destFs.getFileStatus(any(Path.class))).thenReturn(localFs.getFileStatus(new Path(tmpDir.toString()))); + + List aclEntrySource = AclEntry.parseAclSpec("user::rwx,group::rwx,other::rwx", true); + AclStatus aclStatusSource = new AclStatus.Builder().group("group").owner("owner").addEntries(aclEntrySource).build(); + Mockito.when(sourceFs.getAclStatus(any(Path.class))).thenReturn(aclStatusSource); + // Specify a different acl for the destination file so that it is recopied even though the modification time is the same + List aclEntryDest = AclEntry.parseAclSpec("user::rwx,group::rw-,other::r--", true); + AclStatus aclStatusDest = new AclStatus.Builder().group("groupDest").owner("owner").addEntries(aclEntryDest).build(); + Mockito.when(destFs.getAclStatus(any(Path.class))).thenReturn(aclStatusDest); + + Iterator> fileSets = + new ManifestBasedDataset(sourceFs, manifestReadFs, manifestPath, props).getFileSetIterator(destFs, CopyConfiguration.builder(destFs, props).build()); + Assert.assertTrue(fileSets.hasNext()); + FileSet fileSet = fileSets.next(); + Assert.assertEquals(fileSet.getFiles().size(), 3); // 2 files to copy + 1 post publish step + Assert.assertTrue(((PostPublishStep)fileSet.getFiles().get(2)).getStep() instanceof SetPermissionCommitStep); + + } + } + + @Test + public void testIgnoreFilesWithSamePermissions() throws IOException, URISyntaxException { + //Get manifest Path + Path manifestPath = new Path(getClass().getClassLoader().getResource("manifestBasedDistcpTest/sampleManifest.json").getPath()); + // Test manifestDatasetFinder + Properties props = new Properties(); + props.setProperty(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, "/"); + props.setProperty("gobblin.copy.preserved.attributes", "rbugpvta"); + try ( + FileSystem sourceFs = Mockito.mock(FileSystem.class); + FileSystem manifestReadFs = Mockito.mock(FileSystem.class); + FileSystem destFs = Mockito.mock(FileSystem.class) + ) { + setSourceAndDestFsMocks(sourceFs, destFs, manifestPath, manifestReadFs); + Mockito.when(destFs.exists(new Path("/tmp/dataset/test1.txt"))).thenReturn(true); + Mockito.when(destFs.exists(new Path("/tmp/dataset/test2.txt"))).thenReturn(true); + Mockito.when(destFs.getFileStatus(any(Path.class))).thenReturn(localFs.getFileStatus(new Path(tmpDir.toString()))); + + List aclEntrySource = AclEntry.parseAclSpec("user::rwx,group::rwx,other::rwx", true); + AclStatus aclStatusSource = new AclStatus.Builder().group("group").owner("owner").addEntries(aclEntrySource).build(); + Mockito.when(sourceFs.getAclStatus(any(Path.class))).thenReturn(aclStatusSource); + // Same as source acls, files should not be copied + AclStatus aclStatusDest = new AclStatus.Builder().group("groupDest").owner("owner").addEntries(aclEntrySource).build(); + Mockito.when(destFs.getAclStatus(any(Path.class))).thenReturn(aclStatusDest); + + Iterator> fileSets = + new ManifestBasedDataset(sourceFs, manifestReadFs, manifestPath, props).getFileSetIterator(destFs, CopyConfiguration.builder(destFs, props).build()); + Assert.assertTrue(fileSets.hasNext()); + FileSet fileSet = fileSets.next(); + Assert.assertEquals(fileSet.getFiles().size(), 1); // Post publish step + } + } + + private void setSourceAndDestFsMocks(FileSystem sourceFs, FileSystem destFs, Path manifestPath, FileSystem manifestReadFs) throws IOException, URISyntaxException { + URI SRC_FS_URI = new URI("source", "the.source.org", "/", null); + URI MANIFEST_READ_FS_URI = new URI("manifest-read", "the.manifest-source.org", "/", null); + URI DEST_FS_URI = new URI("dest", "the.dest.org", "/", null); + Mockito.when(sourceFs.getUri()).thenReturn(SRC_FS_URI); + Mockito.when(manifestReadFs.getUri()).thenReturn(MANIFEST_READ_FS_URI); + Mockito.when(destFs.getUri()).thenReturn(DEST_FS_URI); + Mockito.when(sourceFs.getFileStatus(any(Path.class))).thenReturn(localFs.getFileStatus(new Path(tmpDir.toString()))); + Mockito.when(sourceFs.exists(any(Path.class))).thenReturn(true); + Mockito.when(manifestReadFs.exists(any(Path.class))).thenReturn(true); + Mockito.when(manifestReadFs.getFileStatus(manifestPath)).thenReturn(localFs.getFileStatus(manifestPath)); + Mockito.when(manifestReadFs.open(manifestPath)).thenReturn(localFs.open(manifestPath)); + + Mockito.doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + Path path = (Path)args[0]; + return localFs.makeQualified(path); + }).when(sourceFs).makeQualified(any(Path.class)); + } }