Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -725,7 +725,7 @@ public class ConfigurationKeys {
public static final int DEFAULT_MR_JOB_MAX_MAPPERS = 100;
public static final boolean DEFAULT_MR_JOB_MAPPER_FAILURE_IS_FATAL = false;
public static final boolean DEFAULT_MR_PERSIST_WORK_UNITS_THEN_CANCEL = false;
public static final String DEFAULT_ENABLE_MR_SPECULATIVE_EXECUTION = "false";
public static final boolean DEFAULT_ENABLE_MR_SPECULATIVE_EXECUTION = false;

/**
* Configuration properties used by the distributed job launcher.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ public MultiWorkUnit() {
super();
}

@Override
public boolean isMultiWorkUnit() {
return true;
}

/**
* Get an immutable list of {@link WorkUnit}s wrapped by this {@link MultiWorkUnit}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.StringWriter;

import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import com.google.gson.stream.JsonWriter;

import org.apache.gobblin.source.extractor.Extractor;
import org.apache.gobblin.source.extractor.Watermark;
Expand Down Expand Up @@ -134,6 +136,11 @@ public WorkUnit(WorkUnit other) {
this.extract = other.getExtract();
}

/** @return whether a multi-work-unit (or else a singular one) */
public boolean isMultiWorkUnit() {
return false; // more efficient than `this instanceof MultiWorkUnit` plus no circular dependency
}

/**
* Factory method.
*
Expand Down Expand Up @@ -365,6 +372,57 @@ public int hashCode() {
return result;
}

/** @return pretty-printed JSON, including all properties */
public String toJsonString() {
StringWriter stringWriter = new StringWriter();
try (JsonWriter jsonWriter = new JsonWriter(stringWriter)) {
jsonWriter.setIndent("\t");
this.toJson(jsonWriter);
} catch (IOException ioe) {
// Ignored
}
return stringWriter.toString();
}

public void toJson(JsonWriter jsonWriter) throws IOException {
jsonWriter.beginObject();

jsonWriter.name("id").value(this.getId());
jsonWriter.name("properties");
jsonWriter.beginObject();
for (String key : this.getPropertyNames()) {
jsonWriter.name(key).value(this.getProp(key));
}
jsonWriter.endObject();

jsonWriter.name("extract");
jsonWriter.beginObject();
jsonWriter.name("extractId").value(this.getExtract().getId());
jsonWriter.name("extractProperties");
jsonWriter.beginObject();
for (String key : this.getExtract().getPropertyNames()) {
jsonWriter.name(key).value(this.getExtract().getProp(key));
}
jsonWriter.endObject();

State prevTableState = this.getExtract().getPreviousTableState();
if (prevTableState != null) {
jsonWriter.name("extractPrevTableState");
jsonWriter.beginObject();
jsonWriter.name("prevStateId").value(prevTableState.getId());
jsonWriter.name("prevStateProperties");
jsonWriter.beginObject();
for (String key : prevTableState.getPropertyNames()) {
jsonWriter.name(key).value(prevTableState.getProp(key));
}
jsonWriter.endObject();
jsonWriter.endObject();
}
jsonWriter.endObject();

jsonWriter.endObject();
}

public String getOutputFilePath() {
// Search for the properties in the workunit.
// This search for the property first in State and then in the Extract of this workunit.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@
import org.apache.gobblin.runtime.TaskStateCollectorService;
import org.apache.gobblin.runtime.listeners.JobListener;
import org.apache.gobblin.runtime.util.StateStores;
import org.apache.gobblin.source.workunit.MultiWorkUnit;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.JobLauncherUtils;
Expand Down Expand Up @@ -110,8 +109,6 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {

private static final Logger LOGGER = LoggerFactory.getLogger(GobblinHelixJobLauncher.class);

private static final String WORK_UNIT_FILE_EXTENSION = ".wu";

private final HelixManager helixManager;
private final TaskDriver helixTaskDriver;
private final String helixWorkFlowName;
Expand Down Expand Up @@ -345,7 +342,7 @@ JobConfig.Builder createHelixJob(List<WorkUnit> workUnits) throws IOException {
try (ParallelRunner stateSerDeRunner = new ParallelRunner(this.stateSerDeRunnerThreads, this.fs)) {
int multiTaskIdSequence = 0;
for (WorkUnit workUnit : workUnits) {
if (workUnit instanceof MultiWorkUnit) {
if (workUnit.isMultiWorkUnit()) {
workUnit.setId(JobLauncherUtils.newMultiTaskId(this.jobContext.getJobId(), multiTaskIdSequence++));
}
addWorkUnit(workUnit, stateSerDeRunner, taskConfigMap);
Expand Down Expand Up @@ -535,15 +532,12 @@ private void addWorkUnit(WorkUnit workUnit, ParallelRunner stateSerDeRunner, Map
private void deleteWorkUnitFromStateStore(String workUnitId, ParallelRunner stateSerDeRunner) {
String workUnitFilePath =
workUnitToHelixConfig.get(workUnitId).getConfigMap().get(GobblinClusterConfigurationKeys.WORK_UNIT_FILE_PATH);
final StateStore stateStore;
Path workUnitFile = new Path(workUnitFilePath);
final String fileName = workUnitFile.getName();
final String storeName = workUnitFile.getParent().getName();
if (fileName.endsWith(MULTI_WORK_UNIT_FILE_EXTENSION)) {
stateStore = stateStores.getMwuStateStore();
} else {
stateStore = stateStores.getWuStateStore();
}
final StateStore stateStore = JobLauncherUtils.hasMultiWorkUnitExtension(workUnitFile)
? stateStores.getMwuStateStore()
: stateStores.getWuStateStore();
stateSerDeRunner.submitCallable(new Callable<Void>() {
@Override
public Void call() throws Exception {
Expand All @@ -561,11 +555,11 @@ private String persistWorkUnit(final Path workUnitFileDir, final WorkUnit workUn
final StateStore stateStore;
String workUnitFileName = workUnit.getId();

if (workUnit instanceof MultiWorkUnit) {
workUnitFileName += MULTI_WORK_UNIT_FILE_EXTENSION;
if (workUnit.isMultiWorkUnit()) {
workUnitFileName += JobLauncherUtils.MULTI_WORK_UNIT_FILE_EXTENSION;
stateStore = stateStores.getMwuStateStore();
} else {
workUnitFileName += WORK_UNIT_FILE_EXTENSION;
workUnitFileName += JobLauncherUtils.WORK_UNIT_FILE_EXTENSION;
stateStore = stateStores.getWuStateStore();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
import org.apache.gobblin.broker.gobblin_scopes.JobScopeInstance;
import org.apache.gobblin.broker.iface.SharedResourcesBroker;
import org.apache.gobblin.runtime.AbstractJobLauncher;
import org.apache.gobblin.runtime.GobblinMultiTaskAttempt;
import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.runtime.util.StateStores;
Expand Down Expand Up @@ -184,7 +183,7 @@ protected List<WorkUnit> getWorkUnits()
WorkUnit workUnit;

try {
if (_workUnitFilePath.getName().endsWith(AbstractJobLauncher.MULTI_WORK_UNIT_FILE_EXTENSION)) {
if (JobLauncherUtils.hasMultiWorkUnitExtension(_workUnitFilePath)) {
workUnit = _stateStores.getMwuStateStore().getAll(storeName, fileName).get(0);
} else {
workUnit = _stateStores.getWuStateStore().getAll(storeName, fileName).get(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

package org.apache.gobblin.data.management.copy;

import com.google.common.cache.Cache;
import java.io.IOException;
import java.io.StringWriter;
import java.util.List;
import java.util.Map;

Expand All @@ -34,8 +34,10 @@

import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.stream.JsonWriter;

import lombok.AccessLevel;
import lombok.EqualsAndHashCode;
Expand Down Expand Up @@ -132,6 +134,52 @@ public CopyableFile(FileStatus origin, Path destination, OwnerAndPermission dest
this.datasetOutputPath = datasetOutputPath;
}

/** @return pretty-printed JSON, including all metadata */
public String toJsonString() {
return toJsonString(true);
}

/** @return pretty-printed JSON, optionally including metadata */
public String toJsonString(boolean includeMetadata) {
StringWriter stringWriter = new StringWriter();
try (JsonWriter jsonWriter = new JsonWriter(stringWriter)) {
jsonWriter.setIndent("\t");
this.toJson(jsonWriter, includeMetadata);
} catch (IOException ioe) {
// Ignored
}
return stringWriter.toString();
}

public void toJson(JsonWriter jsonWriter, boolean includeMetadata) throws IOException {
jsonWriter.beginObject();

jsonWriter
.name("file set").value(this.getFileSet())
.name("origin").value(this.getOrigin().toString())
.name("destination").value(this.getDestination().toString())
.name("destinationOwnerAndPermission").value(this.getDestinationOwnerAndPermission().toString())
// TODO:
// this.ancestorsOwnerAndPermission
// this.checksum
// this.preserve
// this.dataFileVersionStrategy
// this.originTimestamp
// this.upstreamTimestamp
.name("datasetOutputPath").value(this.getDatasetOutputPath().toString());

if (includeMetadata && this.getAdditionalMetadata() != null) {
jsonWriter.name("metadata");
jsonWriter.beginObject();
for (Map.Entry<String, String> entry : this.getAdditionalMetadata().entrySet()) {
jsonWriter.name(entry.getKey()).value(entry.getValue());
}
jsonWriter.endObject();
}

jsonWriter.endObject();
}

/**
* Set file system based source and destination dataset for this {@link CopyableFile}
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,6 @@ public abstract class AbstractJobLauncher implements JobLauncher {

public static final String JOB_STATE_FILE_NAME = "job.state";

public static final String WORK_UNIT_FILE_EXTENSION = ".wu";
public static final String MULTI_WORK_UNIT_FILE_EXTENSION = ".mwu";

public static final String GOBBLIN_JOB_TEMPLATE_KEY = "gobblin.template.uri";

public static final String NUM_WORKUNITS = "numWorkUnits";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.HadoopUtils;
import org.apache.gobblin.util.Id;
import org.apache.gobblin.util.JobLauncherUtils;
import org.apache.gobblin.util.executors.IteratorExecutor;


Expand Down Expand Up @@ -140,10 +139,9 @@ public JobContext(Properties jobProps, Logger logger, SharedResourcesBroker<Gobb
"A job must have a job name specified by job.name");

this.jobName = JobState.getJobNameFromProps(jobProps);
this.jobId = jobProps.containsKey(ConfigurationKeys.JOB_ID_KEY) ? jobProps.getProperty(ConfigurationKeys.JOB_ID_KEY)
: JobLauncherUtils.newJobId(this.jobName);
this.jobId = JobState.getJobIdFromProps(jobProps);
jobProps.setProperty(ConfigurationKeys.JOB_ID_KEY, this.jobId); // in case not yet directly defined as such
this.jobSequence = Long.toString(Id.Job.parse(this.jobId).getSequence());
jobProps.setProperty(ConfigurationKeys.JOB_ID_KEY, this.jobId);

this.jobBroker = instanceBroker.newSubscopedBuilder(new JobScopeInstance(this.jobName, this.jobId))
.withOverridingConfig(ConfigUtils.propertiesToConfig(jobProps)).build();
Expand Down
Loading