diff --git a/gobblin-completeness/src/testFixtures/java/org/apache/gobblin/completeness/audit/TestAuditClient.java b/gobblin-completeness/src/test/java/org/apache/gobblin/completeness/audit/TestAuditClient.java similarity index 100% rename from gobblin-completeness/src/testFixtures/java/org/apache/gobblin/completeness/audit/TestAuditClient.java rename to gobblin-completeness/src/test/java/org/apache/gobblin/completeness/audit/TestAuditClient.java diff --git a/gobblin-completeness/src/testFixtures/java/org/apache/gobblin/completeness/audit/TestAuditClientFactory.java b/gobblin-completeness/src/test/java/org/apache/gobblin/completeness/audit/TestAuditClientFactory.java similarity index 100% rename from gobblin-completeness/src/testFixtures/java/org/apache/gobblin/completeness/audit/TestAuditClientFactory.java rename to gobblin-completeness/src/test/java/org/apache/gobblin/completeness/audit/TestAuditClientFactory.java diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java index f0904280824..6969123e58c 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java @@ -28,13 +28,18 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Properties; +import java.util.TreeMap; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.commit.CommitStep; +import org.apache.gobblin.data.management.copy.entities.PostPublishStep; import org.apache.gobblin.data.management.copy.entities.PrePublishStep; import org.apache.gobblin.data.management.partition.FileSet; import org.apache.gobblin.util.commit.DeleteFileCommitStep; +import org.apache.gobblin.util.commit.SetPermissionCommitStep; + import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -88,9 +93,14 @@ public Iterator> getFileSetIterator(FileSystem targetFs, Cop + "%s, you can specify multi locations split by '',", manifestPath.toString(), manifestReadFs.getUri().toString(), ManifestBasedDatasetFinder.MANIFEST_LOCATION)); } + CopyManifest.CopyableUnitIterator manifests = null; List copyEntities = Lists.newArrayList(); List toDelete = Lists.newArrayList(); + // map of paths and permissions sorted by depth of path, so that permissions can be set in order + Map ancestorOwnerAndPermissions = new TreeMap<>( + (o1, o2) -> Long.compare(o2.chars().filter(ch -> ch == '/').count(), o1.chars().filter(ch -> ch == '/').count())); + try { long startTime = System.currentTimeMillis(); manifests = CopyManifest.getReadIterator(this.manifestReadFs, this.manifestPath); @@ -118,6 +128,13 @@ public Iterator> getFileSetIterator(FileSystem targetFs, Cop CopyableFile copyableFile = copyableFileBuilder.build(); copyableFile.setFsDatasets(srcFs, targetFs); copyEntities.add(copyableFile); + + Path fromPath = srcFs.getFileStatus(fileToCopy).isDirectory() ? fileToCopy : fileToCopy.getParent(); + + ancestorOwnerAndPermissions.putAll( + CopyableFile.resolveReplicatedAncestorOwnerAndPermissionsRecursively(srcFs, fromPath, + new Path(commonFilesParent), configuration)); + if (existOnTarget && srcFile.isFile()) { // this is to match the existing publishing behavior where we won't rewrite the target when it's already existed // todo: Change the publish behavior to support overwrite destination file during rename, instead of relying on this delete step which is needed if we want to support task level publish @@ -128,6 +145,12 @@ public Iterator> getFileSetIterator(FileSystem targetFs, Cop toDelete.add(targetFs.getFileStatus(fileToCopy)); } } + + Properties props = new Properties(); + props.setProperty(SetPermissionCommitStep.STOP_ON_ERROR_KEY, "true"); + CommitStep setPermissionCommitStep = new SetPermissionCommitStep(targetFs, ancestorOwnerAndPermissions, props); + copyEntities.add(new PostPublishStep(datasetURN(), Maps.newHashMap(), setPermissionCommitStep, 1)); + if (!toDelete.isEmpty()) { //todo: add support sync for empty dir CommitStep step = new DeleteFileCommitStep(targetFs, toDelete, this.properties, Optional.absent()); diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java index 954065d8225..2c282bf36ea 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java @@ -255,19 +255,19 @@ private void publishFileSet(CopyEntity.DatasetAndPartition datasetAndPartition, prePublish.size(), postPublish.size())); executeCommitSequence(prePublish); + if (hasCopyableFiles(datasetWorkUnitStates)) { // Targets are always absolute, so we start moving from root (will skip any existing directories). HadoopUtils.renameRecursively(this.fs, datasetWriterOutputPath, new Path("/")); } else { log.info(String.format("[%s] No copyable files in dataset. Proceeding to postpublish steps.", datasetAndPartition.identifier())); } - executeCommitSequence(postPublish); this.fs.delete(datasetWriterOutputPath, true); long datasetOriginTimestamp = Long.MAX_VALUE; long datasetUpstreamTimestamp = Long.MAX_VALUE; - Optional fileSetRoot = Optional.absent(); + Optional fileSetRoot = Optional.absent(); for (WorkUnitState wus : datasetWorkUnitStates) { if (wus.getWorkingState() == WorkingState.SUCCESSFUL) { @@ -300,6 +300,10 @@ private void publishFileSet(CopyEntity.DatasetAndPartition datasetAndPartition, } } + // execute post publish commit steps after preserving file attributes, because some post publish step, + // e.g. SetPermissionCommitStep needs to set permissions + executeCommitSequence(postPublish); + // if there are no valid values for datasetOriginTimestamp and datasetUpstreamTimestamp, use // something more readable if (Long.MAX_VALUE == datasetOriginTimestamp) { diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/ManifestBasedDatasetFinderTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/ManifestBasedDatasetFinderTest.java index b337b4cbdb6..4fb7db59232 100644 --- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/ManifestBasedDatasetFinderTest.java +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/ManifestBasedDatasetFinderTest.java @@ -17,6 +17,7 @@ package org.apache.gobblin.data.management.dataset; +import java.io.File; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; @@ -28,7 +29,10 @@ import org.apache.gobblin.data.management.copy.CopyEntity; import org.apache.gobblin.data.management.copy.ManifestBasedDataset; import org.apache.gobblin.data.management.copy.ManifestBasedDatasetFinder; +import org.apache.gobblin.data.management.copy.entities.PostPublishStep; import org.apache.gobblin.data.management.partition.FileSet; +import org.apache.gobblin.util.commit.SetPermissionCommitStep; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -36,14 +40,19 @@ import org.testng.Assert; import org.testng.annotations.Test; +import com.google.common.io.Files; + import static org.mockito.Mockito.*; public class ManifestBasedDatasetFinderTest { private FileSystem localFs; + private File tmpDir; public ManifestBasedDatasetFinderTest() throws IOException { localFs = FileSystem.getLocal(new Configuration()); + tmpDir = Files.createTempDir(); + tmpDir.deleteOnExit(); } @Test @@ -81,7 +90,7 @@ public void testFindFiles() throws IOException, URISyntaxException { Mockito.when(sourceFs.getUri()).thenReturn(SRC_FS_URI); Mockito.when(manifestReadFs.getUri()).thenReturn(MANIFEST_READ_FS_URI); Mockito.when(destFs.getUri()).thenReturn(DEST_FS_URI); - Mockito.when(sourceFs.getFileStatus(any(Path.class))).thenReturn(localFs.getFileStatus(manifestPath)); + Mockito.when(sourceFs.getFileStatus(any(Path.class))).thenReturn(localFs.getFileStatus(new Path(tmpDir.toString()))); Mockito.when(sourceFs.exists(any(Path.class))).thenReturn(true); Mockito.when(manifestReadFs.exists(any(Path.class))).thenReturn(true); Mockito.when(manifestReadFs.getFileStatus(manifestPath)).thenReturn(localFs.getFileStatus(manifestPath)); @@ -96,7 +105,8 @@ public void testFindFiles() throws IOException, URISyntaxException { new ManifestBasedDataset(sourceFs, manifestReadFs, manifestPath, props).getFileSetIterator(destFs, CopyConfiguration.builder(destFs, props).build()); Assert.assertTrue(fileSets.hasNext()); FileSet fileSet = fileSets.next(); - Assert.assertEquals(fileSet.getFiles().size(), 2); + Assert.assertEquals(fileSet.getFiles().size(), 3); // 2 files to copy + 1 post publish step + Assert.assertTrue(((PostPublishStep)fileSet.getFiles().get(2)).getStep() instanceof SetPermissionCommitStep); Mockito.verify(manifestReadFs, Mockito.times(1)).exists(manifestPath); Mockito.verify(manifestReadFs, Mockito.times(1)).getFileStatus(manifestPath); Mockito.verify(manifestReadFs, Mockito.times(1)).open(manifestPath); diff --git a/gobblin-iceberg/build.gradle b/gobblin-iceberg/build.gradle index c6626b299d3..984ec8fc8a9 100644 --- a/gobblin-iceberg/build.gradle +++ b/gobblin-iceberg/build.gradle @@ -52,7 +52,7 @@ dependencies { transitive = false } testCompile('org.apache.hadoop:hadoop-common:2.6.0') - testImplementation(testFixtures(project(":gobblin-completeness"))) + testCompile project(":gobblin-completeness").sourceSets.test.output testCompile project(path: ':gobblin-modules:gobblin-kafka-common', configuration: 'tests') testCompile externalDependency.testng testCompile externalDependency.mockito diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/PathUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/PathUtils.java index 660cd7875b4..0893915f92c 100644 --- a/gobblin-utility/src/main/java/org/apache/gobblin/util/PathUtils.java +++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/PathUtils.java @@ -55,6 +55,9 @@ public static Path relativizePath(Path fullPath, Path pathPrefix) { * @return true if possibleAncestor is an ancestor of fullPath. */ public static boolean isAncestor(Path possibleAncestor, Path fullPath) { + if (fullPath == null) { + return false; + } return !relativizePath(fullPath, possibleAncestor).equals(getPathWithoutSchemeAndAuthority(fullPath)); }