diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java index 2c282bf36ea..56eb78e4f75 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java @@ -20,17 +20,14 @@ import java.io.IOException; import java.net.URI; import java.util.Collection; -import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; +import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.gobblin.data.management.copy.CopyConfiguration; -import org.apache.gobblin.data.management.copy.PreserveAttributes; -import org.apache.gobblin.util.ConfigUtils; -import org.apache.gobblin.util.filesystem.DataFileVersionStrategy; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -56,6 +53,8 @@ import org.apache.gobblin.data.management.copy.CopyableDataset; import org.apache.gobblin.data.management.copy.CopyableDatasetMetadata; import org.apache.gobblin.data.management.copy.CopyableFile; +import org.apache.gobblin.data.management.copy.CopyConfiguration; +import org.apache.gobblin.data.management.copy.PreserveAttributes; import org.apache.gobblin.data.management.copy.entities.CommitStepCopyEntity; import org.apache.gobblin.data.management.copy.entities.PostPublishStep; import org.apache.gobblin.data.management.copy.entities.PrePublishStep; @@ -71,8 +70,10 @@ import org.apache.gobblin.metrics.event.sla.SlaEventKeys; import org.apache.gobblin.publisher.DataPublisher; import org.apache.gobblin.publisher.UnpublishedHandling; +import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.util.HadoopUtils; import org.apache.gobblin.util.WriterUtils; +import org.apache.gobblin.util.filesystem.DataFileVersionStrategy; /** @@ -222,6 +223,80 @@ private void preserveFileAttrInPublisher(CopyableFile copyableFile) throws IOExc } } + /** Organizes and encapsulates access to {@link WorkUnitState}s according to useful access patterns. */ + @AllArgsConstructor + private static class WorkUnitStatesHelper { + private final Collection workUnitStates; + + public boolean isEmpty() { + return workUnitStates.isEmpty(); + } + + public WorkUnitState getAny() { + return workUnitStates.stream().findFirst() + .orElseThrow(() -> new RuntimeException("no WorkUnitStates - pre-check `isEmpty()` next time!")); + } + + public Collection getAll() { + return workUnitStates; + } + + public boolean hasAnyCopyableFile() throws IOException { + for (WorkUnitState wus : workUnitStates) { + if (CopyableFile.class.isAssignableFrom(CopySource.getCopyEntityClass(wus))) { + return true; + } + } + return false; + } + + public List getPrePublishSteps() throws IOException { + return getCommitSteps(PrePublishStep.class); + } + + public List getPostPublishSteps() throws IOException { + return getCommitSteps(PostPublishStep.class); + } + + public List getPostPublishStates() throws IOException { + return getStatesForCommitStepCopyEntities(PostPublishStep.class); + } + + public List getNonPostPublishStates() throws IOException { + return getStatesForCopyEntitiesNotOf(PostPublishStep.class); + } + + private List getCommitSteps(Class baseClass) throws IOException { + return getStatesForCommitStepCopyEntities(baseClass).stream() + .map(wus -> (CommitStepCopyEntity) CopySource.deserializeCopyEntity(wus)) + .sorted(Comparator.comparingInt(CommitStepCopyEntity::getPriority)) + .map(CommitStepCopyEntity::getStep) + .collect(Collectors.toList()); + } + + private List getStatesForCommitStepCopyEntities(Class baseClass) + throws IOException { + List states = Lists.newArrayList(); + for (WorkUnitState wus : workUnitStates) { + if (baseClass.isAssignableFrom(CopySource.getCopyEntityClass(wus))) { + states.add(wus); + } + } + return states; + } + + private List getStatesForCopyEntitiesNotOf(Class exclusionBaseClass) + throws IOException { + List states = Lists.newArrayList(); + for (WorkUnitState wus : workUnitStates) { + if (!exclusionBaseClass.isAssignableFrom(CopySource.getCopyEntityClass(wus))) { + states.add(wus); + } + } + return states; + } + } + /** * Publish data for a {@link CopyableDataset}. */ @@ -230,9 +305,10 @@ private void publishFileSet(CopyEntity.DatasetAndPartition datasetAndPartition, Map additionalMetadata = Maps.newHashMap(); Preconditions.checkArgument(!datasetWorkUnitStates.isEmpty(), - "publishFileSet received an empty collection work units. This is an error in code."); + String.format("[%s] publishFileSet got empty work unit states. This is an error in code.", datasetAndPartition.identifier())); - WorkUnitState sampledWorkUnitState = datasetWorkUnitStates.iterator().next(); + WorkUnitStatesHelper statesHelper = new WorkUnitStatesHelper(datasetWorkUnitStates); + WorkUnitState sampledWorkUnitState = statesHelper.getAny(); CopyableDatasetMetadata metadata = CopyableDatasetMetadata.deserialize( sampledWorkUnitState.getProp(CopySource.SERIALIZED_COPYABLE_DATASET)); @@ -244,23 +320,23 @@ private void publishFileSet(CopyEntity.DatasetAndPartition datasetAndPartition, Path datasetWriterOutputPath = new Path(writerOutputDir, datasetAndPartition.identifier()); log.info("Merging all split work units."); - DistcpFileSplitter.mergeAllSplitWorkUnits(this.fs, datasetWorkUnitStates); + DistcpFileSplitter.mergeAllSplitWorkUnits(this.fs, statesHelper.getAll()); - log.info(String.format("[%s] Publishing fileSet from %s for dataset %s", datasetAndPartition.identifier(), - datasetWriterOutputPath, metadata.getDatasetURN())); + log.info("[{}] Publishing fileSet from {} for dataset {}", datasetAndPartition.identifier(), + datasetWriterOutputPath, metadata.getDatasetURN()); - List prePublish = getCommitSequence(datasetWorkUnitStates, PrePublishStep.class); - List postPublish = getCommitSequence(datasetWorkUnitStates, PostPublishStep.class); - log.info(String.format("[%s] Found %d prePublish steps and %d postPublish steps.", datasetAndPartition.identifier(), - prePublish.size(), postPublish.size())); + List prePublishSteps = statesHelper.getPrePublishSteps(); + List postPublishSteps = statesHelper.getPostPublishSteps(); + log.info("[{}] Found {} pre-publish steps and {} post-publish steps.", datasetAndPartition.identifier(), + prePublishSteps.size(), postPublishSteps.size()); - executeCommitSequence(prePublish); + executeCommitSequence(prePublishSteps); - if (hasCopyableFiles(datasetWorkUnitStates)) { + if (statesHelper.hasAnyCopyableFile()) { // Targets are always absolute, so we start moving from root (will skip any existing directories). HadoopUtils.renameRecursively(this.fs, datasetWriterOutputPath, new Path("/")); } else { - log.info(String.format("[%s] No copyable files in dataset. Proceeding to postpublish steps.", datasetAndPartition.identifier())); + log.info("[{}] No copyable files in dataset. Proceeding to post-publish steps.", datasetAndPartition.identifier()); } this.fs.delete(datasetWriterOutputPath, true); @@ -269,7 +345,10 @@ private void publishFileSet(CopyEntity.DatasetAndPartition datasetAndPartition, long datasetUpstreamTimestamp = Long.MAX_VALUE; Optional fileSetRoot = Optional.absent(); - for (WorkUnitState wus : datasetWorkUnitStates) { + // ensure every successful state is committed + // WARNING: this MUST NOT run before the WU is actually executed--hence NOT YET for post-publish steps! + // (that's because `WorkUnitState::getWorkingState()` returns `WorkingState.SUCCESSFUL` merely when the overall job succeeded--even for WUs yet to execute) + for (WorkUnitState wus : statesHelper.getNonPostPublishStates()) { if (wus.getWorkingState() == WorkingState.SUCCESSFUL) { wus.setWorkingState(WorkUnitState.WorkingState.COMMITTED); } @@ -300,9 +379,16 @@ private void publishFileSet(CopyEntity.DatasetAndPartition datasetAndPartition, } } - // execute post publish commit steps after preserving file attributes, because some post publish step, - // e.g. SetPermissionCommitStep needs to set permissions - executeCommitSequence(postPublish); + // execute `postPublishSteps` after preserving file attributes, as some, like `SetPermissionCommitStep`, will themselves set permissions + executeCommitSequence(postPublishSteps); + + // since `postPublishSteps` have now executed, finally ready to ensure every successful WU state of those gets committed + for (WorkUnitState wus : statesHelper.getPostPublishStates()) { + if (wus.getWorkingState() == WorkingState.SUCCESSFUL) { + wus.setWorkingState(WorkUnitState.WorkingState.COMMITTED); + } + // NOTE: no need for `CopyableFile`-specific custom handling, as above, because `PostPublishStep extends CommitStepCopyEntity` and so could not be one + } // if there are no valid values for datasetOriginTimestamp and datasetUpstreamTimestamp, use // something more readable @@ -320,42 +406,6 @@ private void publishFileSet(CopyEntity.DatasetAndPartition datasetAndPartition, Long.toString(datasetOriginTimestamp), Long.toString(datasetUpstreamTimestamp), additionalMetadata); } - - private static boolean hasCopyableFiles(Collection workUnits) throws IOException { - for (WorkUnitState wus : workUnits) { - if (CopyableFile.class.isAssignableFrom(CopySource.getCopyEntityClass(wus))) { - return true; - } - } - return false; - } - - private static List getCommitSequence(Collection workUnits, Class baseClass) - throws IOException { - List steps = Lists.newArrayList(); - for (WorkUnitState wus : workUnits) { - if (baseClass.isAssignableFrom(CopySource.getCopyEntityClass(wus))) { - CommitStepCopyEntity step = (CommitStepCopyEntity) CopySource.deserializeCopyEntity(wus); - steps.add(step); - } - } - - Comparator commitStepSorter = new Comparator() { - @Override - public int compare(CommitStepCopyEntity o1, CommitStepCopyEntity o2) { - return Integer.compare(o1.getPriority(), o2.getPriority()); - } - }; - - Collections.sort(steps, commitStepSorter); - List sequence = Lists.newArrayList(); - for (CommitStepCopyEntity entity : steps) { - sequence.add(entity.getStep()); - } - - return sequence; - } - private static void executeCommitSequence(List steps) throws IOException { for (CommitStep step : steps) { step.execute();