Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class State implements WritableShim {

private static final Joiner LIST_JOINER = Joiner.on(",");
private static final Splitter LIST_SPLITTER = Splitter.on(",").trimResults().omitEmptyStrings();
private static final JsonParser JSON_PARSER = new JsonParser();

private String id;

Expand All @@ -62,8 +63,6 @@ public class State implements WritableShim {
@Getter
private Properties specProperties;

private final JsonParser jsonParser = new JsonParser();

public State() {
this.specProperties = new Properties();
this.commonProperties = new Properties();
Expand Down Expand Up @@ -476,7 +475,7 @@ public boolean getPropAsBoolean(String key, boolean def) {
* @return {@link JsonArray} value associated with the key
*/
public JsonArray getPropAsJsonArray(String key) {
JsonElement jsonElement = this.jsonParser.parse(getProp(key));
JsonElement jsonElement = this.JSON_PARSER.parse(getProp(key));
Preconditions.checkArgument(jsonElement.isJsonArray(),
"Value for key " + key + " is malformed, it must be a JsonArray: " + jsonElement);
return jsonElement.getAsJsonArray();
Expand Down
1 change: 1 addition & 0 deletions gobblin-temporal/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ dependencies {
compile project(":gobblin-api")
compile project(":gobblin-cluster")
compile project(":gobblin-core")
compile project(":gobblin-data-management")
compile project(":gobblin-metrics-libs:gobblin-metrics")
compile project(":gobblin-metastore")
compile project(":gobblin-runtime")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.temporal.ddm.activity;

import io.temporal.activity.ActivityInterface;
import io.temporal.activity.ActivityMethod;
import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck;


/** Activity for processing/executing a {@link org.apache.gobblin.source.workunit.WorkUnit}, provided by claim-check */
@ActivityInterface
public interface ProcessWorkUnit {
@ActivityMethod
// CAUTION: void return type won't work, as apparently it mayn't be the return type for `io.temporal.workflow.Functions.Func1`!
int processWorkUnit(WorkUnitClaimCheck wu);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.temporal.ddm.activity.impl;

import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import lombok.extern.slf4j.Slf4j;

import com.google.common.collect.Lists;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
import org.apache.gobblin.broker.iface.SharedResourcesBroker;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.data.management.copy.CopyEntity;
import org.apache.gobblin.data.management.copy.CopySource;
import org.apache.gobblin.data.management.copy.CopyableFile;
import org.apache.gobblin.metastore.StateStore;
import org.apache.gobblin.runtime.AbstractTaskStateTracker;
import org.apache.gobblin.runtime.GobblinMultiTaskAttempt;
import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.runtime.Task;
import org.apache.gobblin.runtime.TaskCreationException;
import org.apache.gobblin.runtime.TaskExecutor;
import org.apache.gobblin.runtime.TaskState;
import org.apache.gobblin.runtime.TaskStateTracker;
import org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooter;
import org.apache.gobblin.runtime.troubleshooter.NoopAutomaticTroubleshooter;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.temporal.ddm.activity.ProcessWorkUnit;
import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
import org.apache.gobblin.temporal.ddm.work.assistance.Help;
import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck;
import org.apache.gobblin.util.JobLauncherUtils;


@Slf4j
public class ProcessWorkUnitImpl implements ProcessWorkUnit {
private static final int LOG_EXTENDED_PROPS_EVERY_WORK_UNITS_STRIDE = 100;

@Override
public int processWorkUnit(WorkUnitClaimCheck wu) {
try (FileSystem fs = Help.loadFileSystemForce(wu)) {
List<WorkUnit> workUnits = loadFlattenedWorkUnits(wu, fs);
log.info("WU [{}] - loaded {} workUnits", wu.getCorrelator(), workUnits.size());
JobState jobState = Help.loadJobState(wu, fs);
return execute(workUnits, wu, jobState, fs);
} catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
}
}

protected List<WorkUnit> loadFlattenedWorkUnits(WorkUnitClaimCheck wu, FileSystem fs) throws IOException {
Path wuPath = new Path(wu.getWorkUnitPath());
WorkUnit workUnit = JobLauncherUtils.createEmptyWorkUnitPerExtension(wuPath);
Help.deserializeStateWithRetries(fs, wuPath, workUnit, wu);
return JobLauncherUtils.flattenWorkUnits(Lists.newArrayList(workUnit));
}

/**
* NOTE: adapted from {@link org.apache.gobblin.runtime.mapreduce.MRJobLauncher.TaskRunner#run(org.apache.hadoop.mapreduce.Mapper.Context)}
* @return count of how many tasks executed (0 if execution ultimately failed, but we *believe* TaskState should already have been recorded beforehand)
*/
protected int execute(List<WorkUnit> workUnits, WorkUnitClaimCheck wu, JobState jobState, FileSystem fs) throws IOException, InterruptedException {
String containerId = "container-id-for-wu-" + wu.getCorrelator();
StateStore<TaskState> taskStateStore = Help.openTaskStateStore(wu, fs);

TaskStateTracker taskStateTracker = createEssentializedTaskStateTracker(wu);
TaskExecutor taskExecutor = new TaskExecutor(new Properties());
GobblinMultiTaskAttempt.CommitPolicy multiTaskAttemptCommitPolicy = GobblinMultiTaskAttempt.CommitPolicy.IMMEDIATE; // as no speculative exec

SharedResourcesBroker<GobblinScopeTypes> resourcesBroker = JobStateUtils.getSharedResourcesBroker(jobState);
AutomaticTroubleshooter troubleshooter = new NoopAutomaticTroubleshooter();
// AutomaticTroubleshooterFactory.createForJob(ConfigUtils.propertiesToConfig(wu.getStateConfig().getProperties()));
troubleshooter.start();

List<String> fileSourcePaths = workUnits.stream()
.map(workUnit -> describeAsCopyableFile(workUnit, wu.getWorkUnitPath()))
.collect(Collectors.toList());
log.info("WU [{}] - submitting {} workUnits for copying files: {}", wu.getCorrelator(),
workUnits.size(), fileSourcePaths);
log.debug("WU [{}] - (first) workUnit: {}", wu.getCorrelator(), workUnits.get(0).toJsonString());

try {
GobblinMultiTaskAttempt taskAttempt = GobblinMultiTaskAttempt.runWorkUnits(
jobState.getJobId(), containerId, jobState, workUnits,
taskStateTracker, taskExecutor, taskStateStore, multiTaskAttemptCommitPolicy,
resourcesBroker, troubleshooter.getIssueRepository(), createInterruptionPredicate(fs, jobState));
return taskAttempt.getNumTasksCreated();
} catch (TaskCreationException tce) { // derived type of `IOException` that ought not be caught!
throw tce;
} catch (IOException ioe) {
// presume execution already occurred, with `TaskState` written to reflect outcome
log.warn("WU [" + wu.getCorrelator() + "] - continuing on despite IOException:", ioe);
return 0;
}
}

/** Demonstration processing, to isolate debugging of WU loading and deserialization */
protected int countSumProperties(List<WorkUnit> workUnits, WorkUnitClaimCheck wu) {
int totalNumProps = workUnits.stream().mapToInt(workUnit -> workUnit.getPropertyNames().size()).sum();
log.info("opened WU [{}] to find {} properties total at '{}'", wu.getCorrelator(), totalNumProps, wu.getWorkUnitPath());
return totalNumProps;
}

protected TaskStateTracker createEssentializedTaskStateTracker(WorkUnitClaimCheck wu) {
return new AbstractTaskStateTracker(new Properties(), log) {
@Override
public void registerNewTask(Task task) {
// TODO: shall we schedule metrics update based on config?
}

@Override
public void onTaskRunCompletion(Task task) {
task.markTaskCompletion();
}

@Override
public void onTaskCommitCompletion(Task task) {
TaskState taskState = task.getTaskState();
// TODO: if metrics configured, report them now
log.info("WU [{} = {}] - finished commit after {}ms with state {}{}", wu.getCorrelator(), task.getTaskId(),
taskState.getTaskDuration(), taskState.getWorkingState(),
taskState.getWorkingState().equals(WorkUnitState.WorkingState.SUCCESSFUL)
? (" to: " + taskState.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR)) : "");
log.debug("WU [{} = {}] - task state: {}", wu.getCorrelator(), task.getTaskId(),
taskState.toJsonString(shouldUseExtendedLogging(wu)));
getOptCopyableFile(taskState).ifPresent(copyableFile -> {
log.info("WU [{} = {}] - completed copyableFile: {}", wu.getCorrelator(), task.getTaskId(),
copyableFile.toJsonString(shouldUseExtendedLogging(wu)));
});
}
};
}

protected String describeAsCopyableFile(WorkUnit workUnit, String workUnitPath) {
return getOptFirstCopyableFile(Lists.newArrayList(workUnit), workUnitPath)
.map(copyableFile -> copyableFile.getOrigin().getPath().toString())
.orElse(
"<<not a CopyableFile("
+ getOptCopyEntityClass(workUnit, workUnitPath)
.map(Class::getSimpleName)
.orElse("<<not a CopyEntity!>>")
+ "): '" + workUnitPath + "'"
);
}

protected Optional<CopyableFile> getOptCopyableFile(TaskState taskState) {
return getOptCopyableFile(taskState, "taskState '" + taskState.getTaskId() + "'");
}

protected Optional<CopyableFile> getOptFirstCopyableFile(List<WorkUnit> workUnits, String workUnitPath) {
return Optional.of(workUnits).filter(wus -> wus.size() > 0).flatMap(wus ->
getOptCopyableFile(wus.get(0), "workUnit '" + workUnitPath + "'")
);
}

protected Optional<CopyableFile> getOptCopyableFile(State state, String logDesc) {
return getOptCopyEntityClass(state, logDesc).flatMap(copyEntityClass -> {
log.debug("(state) {} got (copyEntity) {}", state.getClass().getName(), copyEntityClass.getName());
if (CopyableFile.class.isAssignableFrom(copyEntityClass)) {
String serialization = state.getProp(CopySource.SERIALIZED_COPYABLE_FILE);
if (serialization != null) {
return Optional.of((CopyableFile) CopyEntity.deserialize(serialization));
}
}
return Optional.empty();
});
}

protected Optional<Class<?>> getOptCopyEntityClass(State state, String logDesc) {
try {
return Optional.of(CopySource.getCopyEntityClass(state));
} catch (IOException ioe) {
log.warn(logDesc + " - failed getting copy entity class:", ioe);
return Optional.empty();
}
}

protected Predicate<GobblinMultiTaskAttempt> createInterruptionPredicate(FileSystem fs, JobState jobState) {
// TODO - decide whether to support... and if so, employ a useful path; otherwise, just evaluate predicate to always false
Path interruptionPath = new Path("/not/a/real/path/that/should/ever/exist!");
return createInterruptionPredicate(fs, interruptionPath);
}

protected Predicate<GobblinMultiTaskAttempt> createInterruptionPredicate(FileSystem fs, Path interruptionPath) {
return (gmta) -> {
try {
return fs.exists(interruptionPath);
} catch (IOException ioe) {
return false;
}
};
}

protected boolean shouldUseExtendedLogging(WorkUnitClaimCheck wu) {
try {
Comment on lines +220 to +221
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this function used to just have extended logging for a subset of the workers not to overwhelm the logs?

Copy link
Copy Markdown
Contributor Author

@phet phet Nov 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, it's actually just whether or not to dump the entire JobState or all the CopyableFile props. we only do that once every 100 WUs. this is rough and somewhat arbitrary... a quick solution because I found it helpful to read the full state, but dumping it all became overwhelming when it happened say 15k times because there were that many WUs.

it's not exactly to do such logging only on specific workers, as much as to only do it 1/Nth of the time, regardless of which worker might handle those WUs.

return Long.parseLong(wu.getCorrelator()) % LOG_EXTENDED_PROPS_EVERY_WORK_UNITS_STRIDE == 0;
} catch (NumberFormatException nfe) {
log.warn("unexpected, non-numeric correlator: '{}'", wu.getCorrelator());
return false;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.temporal.ddm.launcher;

import java.net.URI;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;

import lombok.extern.slf4j.Slf4j;

import com.typesafe.config.ConfigFactory;
import io.temporal.client.WorkflowOptions;
import org.apache.hadoop.fs.Path;

import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.runtime.JobLauncher;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.temporal.cluster.GobblinTemporalTaskRunner;
import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
import org.apache.gobblin.temporal.ddm.work.assistance.Help;
import org.apache.gobblin.temporal.ddm.workflow.ProcessWorkUnitsWorkflow;
import org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobLauncher;
import org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobScheduler;
import org.apache.gobblin.util.PropertiesUtils;

import static org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_PREFIX;


/**
* A {@link JobLauncher} for the initial triggering of a Temporal workflow that executes {@link WorkUnit}s to fulfill
* the work they specify. see: {@link ProcessWorkUnitsWorkflow}
*
* <p>
* This class is instantiated by the {@link GobblinTemporalJobScheduler#buildJobLauncher(Properties)} on every job submission to launch the Gobblin job.
* The actual task execution happens in the {@link GobblinTemporalTaskRunner}, usually in a different process.
* </p>
*/
@Slf4j
public class ProcessWorkUnitsJobLauncher extends GobblinTemporalJobLauncher {
public static final String GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_NAME_NODE_URI = GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_PREFIX + "name.node.uri";
public static final String GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_UNITS_DIR = GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_PREFIX + "work.units.dir";

public static final String GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE = GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_PREFIX + "work.max.branches.per.tree";
public static final String GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE = GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_PREFIX + "work.max.sub.trees.per.tree";
Comment on lines +59 to +60
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we generalize these configs to a shared static TemporalConfig? In the Load Generator it also has similar configs for the same purpose of max branches/subtree

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's true. I anticipate the impl to evolve into likely multiple nesting exec workflows beneath the same top-level workflow--and each might legitimately be tuned/configured differently. I didn't spend too much time trying to project how that should work. instead, I expect the needs to clarify once we've reached that point in the impl, which should be in the next days and weeks (i.e. < months)


public static final String WORKFLOW_ID_BASE = "ProcessWorkUnits";

public ProcessWorkUnitsJobLauncher(
Properties jobProps,
Path appWorkDir,
List<? extends Tag<?>> metadataTags,
ConcurrentHashMap<String, Boolean> runningMap
) throws Exception {
super(jobProps, appWorkDir, metadataTags, runningMap);
}

@Override
public void submitJob(List<WorkUnit> workunits) {
try {
URI nameNodeUri = new URI(PropertiesUtils.getRequiredProp(this.jobProps, GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_NAME_NODE_URI));
// NOTE: `Path` is challenging for temporal to ser/de, but nonetheless do pre-construct as `Path`, to pre-validate this prop string's contents
Path workUnitsDir = new Path(PropertiesUtils.getRequiredProp(this.jobProps, GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_UNITS_DIR));
WUProcessingSpec wuSpec = new WUProcessingSpec(nameNodeUri, workUnitsDir.toString());
if (this.jobProps.containsKey(GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE) &&
this.jobProps.containsKey(GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE)) {
int maxBranchesPerTree = PropertiesUtils.getRequiredPropAsInt(this.jobProps, GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE);
int maxSubTreesPerTree = PropertiesUtils.getRequiredPropAsInt(this.jobProps, GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE);
wuSpec.setTuning(new WUProcessingSpec.Tuning(maxBranchesPerTree, maxSubTreesPerTree));
}
WorkflowOptions options = WorkflowOptions.newBuilder()
.setTaskQueue(this.queueName)
.setWorkflowId(Help.qualifyNamePerExec(WORKFLOW_ID_BASE, wuSpec, ConfigFactory.parseProperties(jobProps)))
.build();
ProcessWorkUnitsWorkflow workflow = this.client.newWorkflowStub(ProcessWorkUnitsWorkflow.class, options);
workflow.process(wuSpec);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
Loading