Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,14 @@
import java.io.IOException;
import java.net.URI;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.data.management.copy.CopyConfiguration;
import org.apache.gobblin.data.management.copy.PreserveAttributes;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.filesystem.DataFileVersionStrategy;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand All @@ -56,6 +53,8 @@
import org.apache.gobblin.data.management.copy.CopyableDataset;
import org.apache.gobblin.data.management.copy.CopyableDatasetMetadata;
import org.apache.gobblin.data.management.copy.CopyableFile;
import org.apache.gobblin.data.management.copy.CopyConfiguration;
import org.apache.gobblin.data.management.copy.PreserveAttributes;
import org.apache.gobblin.data.management.copy.entities.CommitStepCopyEntity;
import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
import org.apache.gobblin.data.management.copy.entities.PrePublishStep;
Expand All @@ -71,8 +70,10 @@
import org.apache.gobblin.metrics.event.sla.SlaEventKeys;
import org.apache.gobblin.publisher.DataPublisher;
import org.apache.gobblin.publisher.UnpublishedHandling;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.HadoopUtils;
import org.apache.gobblin.util.WriterUtils;
import org.apache.gobblin.util.filesystem.DataFileVersionStrategy;


/**
Expand Down Expand Up @@ -222,6 +223,80 @@ private void preserveFileAttrInPublisher(CopyableFile copyableFile) throws IOExc
}
}

/** Organizes and encapsulates access to {@link WorkUnitState}s according to useful access patterns. */
@AllArgsConstructor
private static class WorkUnitStatesHelper {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe not for this PR, but if this stems from multiple categories of workloads being misidentified maybe we should have this as its own separate class and have a stronger type system around workunit collections?

Copy link
Copy Markdown
Contributor Author

@phet phet Dec 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see mis-identification as the underlying reason for the regression. rather it was more likely not appreciating how presumptuous WorkUnitState::getWorkingState() is about the WU it's invoked on having already executed. I doubt it was clear (certainly it shocked me!) that it would fall back on the enclosing job's state to provide state for a WU that had itself never run!

as for the WorkUnitStatesHelper, I don't presently foresee utility beyond CopyDataPublisher. if the opportunity for leverage arises later, we can change from private and/or unnest it at that point.

private final Collection<WorkUnitState> workUnitStates;

public boolean isEmpty() {
return workUnitStates.isEmpty();
}

public WorkUnitState getAny() {
return workUnitStates.stream().findFirst()
.orElseThrow(() -> new RuntimeException("no WorkUnitStates - pre-check `isEmpty()` next time!"));
}

public Collection<WorkUnitState> getAll() {
return workUnitStates;
}

public boolean hasAnyCopyableFile() throws IOException {
for (WorkUnitState wus : workUnitStates) {
if (CopyableFile.class.isAssignableFrom(CopySource.getCopyEntityClass(wus))) {
return true;
}
}
return false;
}

public List<CommitStep> getPrePublishSteps() throws IOException {
return getCommitSteps(PrePublishStep.class);
}

public List<CommitStep> getPostPublishSteps() throws IOException {
return getCommitSteps(PostPublishStep.class);
}

public List<WorkUnitState> getPostPublishStates() throws IOException {
return getStatesForCommitStepCopyEntities(PostPublishStep.class);
}

public List<WorkUnitState> getNonPostPublishStates() throws IOException {
return getStatesForCopyEntitiesNotOf(PostPublishStep.class);
}

private List<CommitStep> getCommitSteps(Class<? extends CommitStepCopyEntity> baseClass) throws IOException {
return getStatesForCommitStepCopyEntities(baseClass).stream()
.map(wus -> (CommitStepCopyEntity) CopySource.deserializeCopyEntity(wus))
.sorted(Comparator.comparingInt(CommitStepCopyEntity::getPriority))
.map(CommitStepCopyEntity::getStep)
.collect(Collectors.toList());
}

private List<WorkUnitState> getStatesForCommitStepCopyEntities(Class<? extends CommitStepCopyEntity> baseClass)
throws IOException {
List<WorkUnitState> states = Lists.newArrayList();
for (WorkUnitState wus : workUnitStates) {
if (baseClass.isAssignableFrom(CopySource.getCopyEntityClass(wus))) {
states.add(wus);
}
}
return states;
}

private List<WorkUnitState> getStatesForCopyEntitiesNotOf(Class<? extends CommitStepCopyEntity> exclusionBaseClass)
throws IOException {
List<WorkUnitState> states = Lists.newArrayList();
for (WorkUnitState wus : workUnitStates) {
if (!exclusionBaseClass.isAssignableFrom(CopySource.getCopyEntityClass(wus))) {
states.add(wus);
}
}
return states;
}
}

/**
* Publish data for a {@link CopyableDataset}.
*/
Expand All @@ -230,9 +305,10 @@ private void publishFileSet(CopyEntity.DatasetAndPartition datasetAndPartition,
Map<String, String> additionalMetadata = Maps.newHashMap();

Preconditions.checkArgument(!datasetWorkUnitStates.isEmpty(),
"publishFileSet received an empty collection work units. This is an error in code.");
String.format("[%s] publishFileSet got empty work unit states. This is an error in code.", datasetAndPartition.identifier()));

WorkUnitState sampledWorkUnitState = datasetWorkUnitStates.iterator().next();
WorkUnitStatesHelper statesHelper = new WorkUnitStatesHelper(datasetWorkUnitStates);
WorkUnitState sampledWorkUnitState = statesHelper.getAny();

CopyableDatasetMetadata metadata = CopyableDatasetMetadata.deserialize(
sampledWorkUnitState.getProp(CopySource.SERIALIZED_COPYABLE_DATASET));
Expand All @@ -244,23 +320,23 @@ private void publishFileSet(CopyEntity.DatasetAndPartition datasetAndPartition,
Path datasetWriterOutputPath = new Path(writerOutputDir, datasetAndPartition.identifier());

log.info("Merging all split work units.");
DistcpFileSplitter.mergeAllSplitWorkUnits(this.fs, datasetWorkUnitStates);
DistcpFileSplitter.mergeAllSplitWorkUnits(this.fs, statesHelper.getAll());

log.info(String.format("[%s] Publishing fileSet from %s for dataset %s", datasetAndPartition.identifier(),
datasetWriterOutputPath, metadata.getDatasetURN()));
log.info("[{}] Publishing fileSet from {} for dataset {}", datasetAndPartition.identifier(),
datasetWriterOutputPath, metadata.getDatasetURN());

List<CommitStep> prePublish = getCommitSequence(datasetWorkUnitStates, PrePublishStep.class);
List<CommitStep> postPublish = getCommitSequence(datasetWorkUnitStates, PostPublishStep.class);
log.info(String.format("[%s] Found %d prePublish steps and %d postPublish steps.", datasetAndPartition.identifier(),
prePublish.size(), postPublish.size()));
List<CommitStep> prePublishSteps = statesHelper.getPrePublishSteps();
List<CommitStep> postPublishSteps = statesHelper.getPostPublishSteps();
log.info("[{}] Found {} pre-publish steps and {} post-publish steps.", datasetAndPartition.identifier(),
prePublishSteps.size(), postPublishSteps.size());

executeCommitSequence(prePublish);
executeCommitSequence(prePublishSteps);

if (hasCopyableFiles(datasetWorkUnitStates)) {
if (statesHelper.hasAnyCopyableFile()) {
// Targets are always absolute, so we start moving from root (will skip any existing directories).
HadoopUtils.renameRecursively(this.fs, datasetWriterOutputPath, new Path("/"));
} else {
log.info(String.format("[%s] No copyable files in dataset. Proceeding to postpublish steps.", datasetAndPartition.identifier()));
log.info("[{}] No copyable files in dataset. Proceeding to post-publish steps.", datasetAndPartition.identifier());
}

this.fs.delete(datasetWriterOutputPath, true);
Expand All @@ -269,7 +345,10 @@ private void publishFileSet(CopyEntity.DatasetAndPartition datasetAndPartition,
long datasetUpstreamTimestamp = Long.MAX_VALUE;
Optional<String> fileSetRoot = Optional.absent();

for (WorkUnitState wus : datasetWorkUnitStates) {
// ensure every successful state is committed
// WARNING: this MUST NOT run before the WU is actually executed--hence NOT YET for post-publish steps!
// (that's because `WorkUnitState::getWorkingState()` returns `WorkingState.SUCCESSFUL` merely when the overall job succeeded--even for WUs yet to execute)
for (WorkUnitState wus : statesHelper.getNonPostPublishStates()) {
if (wus.getWorkingState() == WorkingState.SUCCESSFUL) {
wus.setWorkingState(WorkUnitState.WorkingState.COMMITTED);
}
Expand Down Expand Up @@ -300,9 +379,16 @@ private void publishFileSet(CopyEntity.DatasetAndPartition datasetAndPartition,
}
}

// execute post publish commit steps after preserving file attributes, because some post publish step,
// e.g. SetPermissionCommitStep needs to set permissions
executeCommitSequence(postPublish);
// execute `postPublishSteps` after preserving file attributes, as some, like `SetPermissionCommitStep`, will themselves set permissions
executeCommitSequence(postPublishSteps);

// since `postPublishSteps` have now executed, finally ready to ensure every successful WU state of those gets committed
for (WorkUnitState wus : statesHelper.getPostPublishStates()) {
if (wus.getWorkingState() == WorkingState.SUCCESSFUL) {
wus.setWorkingState(WorkUnitState.WorkingState.COMMITTED);
}
// NOTE: no need for `CopyableFile`-specific custom handling, as above, because `PostPublishStep extends CommitStepCopyEntity` and so could not be one
}

// if there are no valid values for datasetOriginTimestamp and datasetUpstreamTimestamp, use
// something more readable
Expand All @@ -320,42 +406,6 @@ private void publishFileSet(CopyEntity.DatasetAndPartition datasetAndPartition,
Long.toString(datasetOriginTimestamp), Long.toString(datasetUpstreamTimestamp), additionalMetadata);
}


private static boolean hasCopyableFiles(Collection<WorkUnitState> workUnits) throws IOException {
for (WorkUnitState wus : workUnits) {
if (CopyableFile.class.isAssignableFrom(CopySource.getCopyEntityClass(wus))) {
return true;
}
}
return false;
}

private static List<CommitStep> getCommitSequence(Collection<WorkUnitState> workUnits, Class<?> baseClass)
throws IOException {
List<CommitStepCopyEntity> steps = Lists.newArrayList();
for (WorkUnitState wus : workUnits) {
if (baseClass.isAssignableFrom(CopySource.getCopyEntityClass(wus))) {
CommitStepCopyEntity step = (CommitStepCopyEntity) CopySource.deserializeCopyEntity(wus);
steps.add(step);
}
}

Comparator<CommitStepCopyEntity> commitStepSorter = new Comparator<CommitStepCopyEntity>() {
@Override
public int compare(CommitStepCopyEntity o1, CommitStepCopyEntity o2) {
return Integer.compare(o1.getPriority(), o2.getPriority());
}
};

Collections.sort(steps, commitStepSorter);
List<CommitStep> sequence = Lists.newArrayList();
for (CommitStepCopyEntity entity : steps) {
sequence.add(entity.getStep());
}

return sequence;
}

private static void executeCommitSequence(List<CommitStep> steps) throws IOException {
for (CommitStep step : steps) {
step.execute();
Expand Down