diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java index 6d36d9ff3e3..a50ba8c75eb 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java @@ -725,7 +725,7 @@ public class ConfigurationKeys { public static final int DEFAULT_MR_JOB_MAX_MAPPERS = 100; public static final boolean DEFAULT_MR_JOB_MAPPER_FAILURE_IS_FATAL = false; public static final boolean DEFAULT_MR_PERSIST_WORK_UNITS_THEN_CANCEL = false; - public static final String DEFAULT_ENABLE_MR_SPECULATIVE_EXECUTION = "false"; + public static final boolean DEFAULT_ENABLE_MR_SPECULATIVE_EXECUTION = false; /** * Configuration properties used by the distributed job launcher. diff --git a/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/MultiWorkUnit.java b/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/MultiWorkUnit.java index d254de06516..1392fa97f4c 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/MultiWorkUnit.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/MultiWorkUnit.java @@ -55,6 +55,11 @@ public MultiWorkUnit() { super(); } + @Override + public boolean isMultiWorkUnit() { + return true; + } + /** * Get an immutable list of {@link WorkUnit}s wrapped by this {@link MultiWorkUnit}. * diff --git a/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/WorkUnit.java b/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/WorkUnit.java index bf38c35251f..139e60c018f 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/WorkUnit.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/WorkUnit.java @@ -24,10 +24,12 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.io.StringWriter; import com.google.gson.Gson; import com.google.gson.JsonElement; import com.google.gson.JsonParser; +import com.google.gson.stream.JsonWriter; import org.apache.gobblin.source.extractor.Extractor; import org.apache.gobblin.source.extractor.Watermark; @@ -134,6 +136,11 @@ public WorkUnit(WorkUnit other) { this.extract = other.getExtract(); } + /** @return whether a multi-work-unit (or else a singular one) */ + public boolean isMultiWorkUnit() { + return false; // more efficient than `this instanceof MultiWorkUnit` plus no circular dependency + } + /** * Factory method. * @@ -365,6 +372,57 @@ public int hashCode() { return result; } + /** @return pretty-printed JSON, including all properties */ + public String toJsonString() { + StringWriter stringWriter = new StringWriter(); + try (JsonWriter jsonWriter = new JsonWriter(stringWriter)) { + jsonWriter.setIndent("\t"); + this.toJson(jsonWriter); + } catch (IOException ioe) { + // Ignored + } + return stringWriter.toString(); + } + + public void toJson(JsonWriter jsonWriter) throws IOException { + jsonWriter.beginObject(); + + jsonWriter.name("id").value(this.getId()); + jsonWriter.name("properties"); + jsonWriter.beginObject(); + for (String key : this.getPropertyNames()) { + jsonWriter.name(key).value(this.getProp(key)); + } + jsonWriter.endObject(); + + jsonWriter.name("extract"); + jsonWriter.beginObject(); + jsonWriter.name("extractId").value(this.getExtract().getId()); + jsonWriter.name("extractProperties"); + jsonWriter.beginObject(); + for (String key : this.getExtract().getPropertyNames()) { + jsonWriter.name(key).value(this.getExtract().getProp(key)); + } + jsonWriter.endObject(); + + State prevTableState = this.getExtract().getPreviousTableState(); + if (prevTableState != null) { + jsonWriter.name("extractPrevTableState"); + jsonWriter.beginObject(); + jsonWriter.name("prevStateId").value(prevTableState.getId()); + jsonWriter.name("prevStateProperties"); + jsonWriter.beginObject(); + for (String key : prevTableState.getPropertyNames()) { + jsonWriter.name(key).value(prevTableState.getProp(key)); + } + jsonWriter.endObject(); + jsonWriter.endObject(); + } + jsonWriter.endObject(); + + jsonWriter.endObject(); + } + public String getOutputFilePath() { // Search for the properties in the workunit. // This search for the property first in State and then in the Extract of this workunit. diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java index cf324b44342..f0fcc258f24 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java @@ -71,7 +71,6 @@ import org.apache.gobblin.runtime.TaskStateCollectorService; import org.apache.gobblin.runtime.listeners.JobListener; import org.apache.gobblin.runtime.util.StateStores; -import org.apache.gobblin.source.workunit.MultiWorkUnit; import org.apache.gobblin.source.workunit.WorkUnit; import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.util.JobLauncherUtils; @@ -110,8 +109,6 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { private static final Logger LOGGER = LoggerFactory.getLogger(GobblinHelixJobLauncher.class); - private static final String WORK_UNIT_FILE_EXTENSION = ".wu"; - private final HelixManager helixManager; private final TaskDriver helixTaskDriver; private final String helixWorkFlowName; @@ -345,7 +342,7 @@ JobConfig.Builder createHelixJob(List workUnits) throws IOException { try (ParallelRunner stateSerDeRunner = new ParallelRunner(this.stateSerDeRunnerThreads, this.fs)) { int multiTaskIdSequence = 0; for (WorkUnit workUnit : workUnits) { - if (workUnit instanceof MultiWorkUnit) { + if (workUnit.isMultiWorkUnit()) { workUnit.setId(JobLauncherUtils.newMultiTaskId(this.jobContext.getJobId(), multiTaskIdSequence++)); } addWorkUnit(workUnit, stateSerDeRunner, taskConfigMap); @@ -535,15 +532,12 @@ private void addWorkUnit(WorkUnit workUnit, ParallelRunner stateSerDeRunner, Map private void deleteWorkUnitFromStateStore(String workUnitId, ParallelRunner stateSerDeRunner) { String workUnitFilePath = workUnitToHelixConfig.get(workUnitId).getConfigMap().get(GobblinClusterConfigurationKeys.WORK_UNIT_FILE_PATH); - final StateStore stateStore; Path workUnitFile = new Path(workUnitFilePath); final String fileName = workUnitFile.getName(); final String storeName = workUnitFile.getParent().getName(); - if (fileName.endsWith(MULTI_WORK_UNIT_FILE_EXTENSION)) { - stateStore = stateStores.getMwuStateStore(); - } else { - stateStore = stateStores.getWuStateStore(); - } + final StateStore stateStore = JobLauncherUtils.hasMultiWorkUnitExtension(workUnitFile) + ? stateStores.getMwuStateStore() + : stateStores.getWuStateStore(); stateSerDeRunner.submitCallable(new Callable() { @Override public Void call() throws Exception { @@ -561,11 +555,11 @@ private String persistWorkUnit(final Path workUnitFileDir, final WorkUnit workUn final StateStore stateStore; String workUnitFileName = workUnit.getId(); - if (workUnit instanceof MultiWorkUnit) { - workUnitFileName += MULTI_WORK_UNIT_FILE_EXTENSION; + if (workUnit.isMultiWorkUnit()) { + workUnitFileName += JobLauncherUtils.MULTI_WORK_UNIT_FILE_EXTENSION; stateStore = stateStores.getMwuStateStore(); } else { - workUnitFileName += WORK_UNIT_FILE_EXTENSION; + workUnitFileName += JobLauncherUtils.WORK_UNIT_FILE_EXTENSION; stateStore = stateStores.getWuStateStore(); } diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java index 93caab41c73..5d67c9f510e 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java @@ -39,7 +39,6 @@ import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes; import org.apache.gobblin.broker.gobblin_scopes.JobScopeInstance; import org.apache.gobblin.broker.iface.SharedResourcesBroker; -import org.apache.gobblin.runtime.AbstractJobLauncher; import org.apache.gobblin.runtime.GobblinMultiTaskAttempt; import org.apache.gobblin.runtime.JobState; import org.apache.gobblin.runtime.util.StateStores; @@ -184,7 +183,7 @@ protected List getWorkUnits() WorkUnit workUnit; try { - if (_workUnitFilePath.getName().endsWith(AbstractJobLauncher.MULTI_WORK_UNIT_FILE_EXTENSION)) { + if (JobLauncherUtils.hasMultiWorkUnitExtension(_workUnitFilePath)) { workUnit = _stateStores.getMwuStateStore().getAll(storeName, fileName).get(0); } else { workUnit = _stateStores.getWuStateStore().getAll(storeName, fileName).get(0); diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java index 85fa80f0fe8..9e9af05e3cf 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java @@ -17,8 +17,8 @@ package org.apache.gobblin.data.management.copy; -import com.google.common.cache.Cache; import java.io.IOException; +import java.io.StringWriter; import java.util.List; import java.util.Map; @@ -34,8 +34,10 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.cache.Cache; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.gson.stream.JsonWriter; import lombok.AccessLevel; import lombok.EqualsAndHashCode; @@ -132,6 +134,52 @@ public CopyableFile(FileStatus origin, Path destination, OwnerAndPermission dest this.datasetOutputPath = datasetOutputPath; } + /** @return pretty-printed JSON, including all metadata */ + public String toJsonString() { + return toJsonString(true); + } + + /** @return pretty-printed JSON, optionally including metadata */ + public String toJsonString(boolean includeMetadata) { + StringWriter stringWriter = new StringWriter(); + try (JsonWriter jsonWriter = new JsonWriter(stringWriter)) { + jsonWriter.setIndent("\t"); + this.toJson(jsonWriter, includeMetadata); + } catch (IOException ioe) { + // Ignored + } + return stringWriter.toString(); + } + + public void toJson(JsonWriter jsonWriter, boolean includeMetadata) throws IOException { + jsonWriter.beginObject(); + + jsonWriter + .name("file set").value(this.getFileSet()) + .name("origin").value(this.getOrigin().toString()) + .name("destination").value(this.getDestination().toString()) + .name("destinationOwnerAndPermission").value(this.getDestinationOwnerAndPermission().toString()) + // TODO: + // this.ancestorsOwnerAndPermission + // this.checksum + // this.preserve + // this.dataFileVersionStrategy + // this.originTimestamp + // this.upstreamTimestamp + .name("datasetOutputPath").value(this.getDatasetOutputPath().toString()); + + if (includeMetadata && this.getAdditionalMetadata() != null) { + jsonWriter.name("metadata"); + jsonWriter.beginObject(); + for (Map.Entry entry : this.getAdditionalMetadata().entrySet()) { + jsonWriter.name(entry.getKey()).value(entry.getValue()); + } + jsonWriter.endObject(); + } + + jsonWriter.endObject(); + } + /** * Set file system based source and destination dataset for this {@link CopyableFile} * diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java index 3f2570c68f5..bf86e628e45 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java @@ -129,9 +129,6 @@ public abstract class AbstractJobLauncher implements JobLauncher { public static final String JOB_STATE_FILE_NAME = "job.state"; - public static final String WORK_UNIT_FILE_EXTENSION = ".wu"; - public static final String MULTI_WORK_UNIT_FILE_EXTENSION = ".mwu"; - public static final String GOBBLIN_JOB_TEMPLATE_KEY = "gobblin.template.uri"; public static final String NUM_WORKUNITS = "numWorkUnits"; diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java index 5c6eb3eae9c..658b308b79a 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java @@ -74,7 +74,6 @@ import org.apache.gobblin.util.ExecutorsUtils; import org.apache.gobblin.util.HadoopUtils; import org.apache.gobblin.util.Id; -import org.apache.gobblin.util.JobLauncherUtils; import org.apache.gobblin.util.executors.IteratorExecutor; @@ -140,10 +139,9 @@ public JobContext(Properties jobProps, Logger logger, SharedResourcesBroker taskStates = Maps.newLinkedHashMap(); // Skipped task states shouldn't be exposed to publisher, but they need to be in JobState and DatasetState so that they can be written to StateStore. @@ -149,7 +163,7 @@ public JobState(String jobName, String jobId) { this.setId(jobId); } - public JobState(State properties,String jobName, String jobId) { + public JobState(State properties, String jobName, String jobId) { super(properties); this.jobName = jobName; this.jobId = jobId; @@ -172,6 +186,11 @@ public static String getJobNameFromProps(Properties props) { return props.getProperty(ConfigurationKeys.JOB_NAME_KEY); } + public static String getJobIdFromProps(Properties props) { + return props.containsKey(ConfigurationKeys.JOB_ID_KEY) ? props.getProperty(ConfigurationKeys.JOB_ID_KEY) + : JobLauncherUtils.newJobId(JobState.getJobNameFromProps(props)); + } + public static String getJobGroupFromState(State state) { return state.getProp(ConfigurationKeys.JOB_GROUP_KEY); } @@ -188,69 +207,6 @@ public static String getJobDescriptionFromProps(Properties props) { return props.getProperty(ConfigurationKeys.JOB_DESCRIPTION_KEY); } - /** - * Get job name. - * - * @return job name - */ - public String getJobName() { - return this.jobName; - } - - /** - * Set job name. - * - * @param jobName job name - */ - public void setJobName(String jobName) { - this.jobName = jobName; - } - - /** - * Get job ID. - * - * @return job ID - */ - public String getJobId() { - return this.jobId; - } - - /** - * Set job ID. - * - * @param jobId job ID - */ - public void setJobId(String jobId) { - this.jobId = jobId; - } - - /** - * Get job start time. - * - * @return job start time - */ - public long getStartTime() { - return this.startTime; - } - - /** - * Set job start time. - * - * @param startTime job start time - */ - public void setStartTime(long startTime) { - this.startTime = startTime; - } - - /** - * Get job end time. - * - * @return job end time - */ - public long getEndTime() { - return this.endTime; - } - /** * Get the currently elapsed time for this job. * @return @@ -265,33 +221,6 @@ public long getElapsedTime() { return 0; } - /** - * Set job end time. - * - * @param endTime job end time - */ - public void setEndTime(long endTime) { - this.endTime = endTime; - } - - /** - * Get job duration in milliseconds. - * - * @return job duration in milliseconds - */ - public long getDuration() { - return this.duration; - } - - /** - * Set job duration in milliseconds. - * - * @param duration job duration in milliseconds - */ - public void setDuration(long duration) { - this.duration = duration; - } - /** * Get job running state of type {@link RunningState}. * @@ -310,24 +239,6 @@ public synchronized void setState(RunningState state) { this.state = state; } - /** - * Get the number of tasks this job consists of. - * - * @return number of tasks this job consists of - */ - public int getTaskCount() { - return this.taskCount; - } - - /** - * Set the number of tasks this job consists of. - * - * @param taskCount number of tasks this job consists of - */ - public void setTaskCount(int taskCount) { - this.taskCount = taskCount; - } - /** * If not already present, set the {@link ConfigurationKeys#JOB_FAILURE_EXCEPTION_KEY} to a {@link String} * representation of the given {@link Throwable}. @@ -682,12 +593,23 @@ public int hashCode() { return result; } + /** @return pretty-printed JSON, without including properties */ @Override public String toString() { + return toJsonString(false); + } + + /** @return pretty-printed JSON, including all properties */ + public String toJsonString() { + return toJsonString(true); + } + + /** @return pretty-printed JSON, optionally including properties */ + public String toJsonString(boolean includeProperties) { StringWriter stringWriter = new StringWriter(); try (JsonWriter jsonWriter = new JsonWriter(stringWriter)) { jsonWriter.setIndent("\t"); - this.toJson(jsonWriter, false); + this.toJson(jsonWriter, includeProperties); } catch (IOException ioe) { // Ignored } diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskState.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskState.java index 903d94b18ce..60c4752038a 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskState.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskState.java @@ -20,6 +20,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.io.StringWriter; import java.lang.reflect.Type; import java.util.ArrayList; import java.util.List; @@ -40,6 +41,7 @@ import javax.annotation.Nullable; import lombok.Getter; +import lombok.Setter; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.WorkUnitState; @@ -89,15 +91,27 @@ public class TaskState extends WorkUnitState implements TaskProgress { */ private static final String BYTES_PER_SECOND = "bytesPerSec"; + /** ID of the job this {@link TaskState} is for */ + @Getter @Setter private String jobId; + /** ID of the task this {@link TaskState} is for */ + @Getter @Setter private String taskId; + /** sequence number of the task this {@link TaskState} is for */ + @Getter private String taskKey; @Getter private Optional taskAttemptId; + /** task start time in milliseconds */ + @Getter @Setter private long startTime = 0; + /** task end time in milliseconds */ + @Getter @Setter private long endTime = 0; - private long duration; + /** task duration in milliseconds */ + @Getter @Setter + private long taskDuration; // Needed for serialization/deserialization public TaskState() {} @@ -123,105 +137,6 @@ public TaskState(TaskState taskState) { this.setId(this.taskId); } - /** - * Get the ID of the job this {@link TaskState} is for. - * - * @return ID of the job this {@link TaskState} is for - */ - public String getJobId() { - return this.jobId; - } - - /** - * Set the ID of the job this {@link TaskState} is for. - * - * @param jobId ID of the job this {@link TaskState} is for - */ - public void setJobId(String jobId) { - this.jobId = jobId; - } - - /** - * Get the sequence number of the task this {@link TaskState} is for. - * - * @return Sequence number of the task this {@link TaskState} is for - */ - public String getTaskKey() { - return this.taskKey; - } - - /** - * Get the ID of the task this {@link TaskState} is for. - * - * @return ID of the task this {@link TaskState} is for - */ - public String getTaskId() { - return this.taskId; - } - - /** - * Set the ID of the task this {@link TaskState} is for. - * - * @param taskId ID of the task this {@link TaskState} is for - */ - public void setTaskId(String taskId) { - this.taskId = taskId; - } - - /** - * Get task start time in milliseconds. - * - * @return task start time in milliseconds - */ - public long getStartTime() { - return this.startTime; - } - - /** - * Set task start time in milliseconds. - * - * @param startTime task start time in milliseconds - */ - public void setStartTime(long startTime) { - this.startTime = startTime; - } - - /** - * Get task end time in milliseconds. - * - * @return task end time in milliseconds - */ - public long getEndTime() { - return this.endTime; - } - - /** - * set task end time in milliseconds. - * - * @param endTime task end time in milliseconds - */ - public void setEndTime(long endTime) { - this.endTime = endTime; - } - - /** - * Get task duration in milliseconds. - * - * @return task duration in milliseconds - */ - public long getTaskDuration() { - return this.duration; - } - - /** - * Set task duration in milliseconds. - * - * @param duration task duration in milliseconds - */ - public void setTaskDuration(long duration) { - this.duration = duration; - } - /** * Get the {@link ConfigurationKeys#TASK_FAILURE_EXCEPTION_KEY} if it exists, else return {@link Optional#absent()}. */ @@ -348,7 +263,7 @@ public void readFields(DataInput in) throws IOException { this.setId(this.taskId); this.startTime = in.readLong(); this.endTime = in.readLong(); - this.duration = in.readLong(); + this.taskDuration = in.readLong(); super.readFields(in); } @@ -361,7 +276,7 @@ public void write(DataOutput out) throws IOException { text.write(out); out.writeLong(this.startTime); out.writeLong(this.endTime); - out.writeLong(this.duration); + out.writeLong(this.taskDuration); super.write(out); } @@ -384,6 +299,23 @@ public int hashCode() { return result; } + /** @return pretty-printed JSON, including all properties */ + public String toJsonString() { + return toJsonString(true); + } + + /** @return pretty-printed JSON, optionally including properties */ + public String toJsonString(boolean includeProperties) { + StringWriter stringWriter = new StringWriter(); + try (JsonWriter jsonWriter = new JsonWriter(stringWriter)) { + jsonWriter.setIndent("\t"); + this.toJson(jsonWriter, includeProperties); + } catch (IOException ioe) { + // Ignored + } + return stringWriter.toString(); + } + /** * Convert this {@link TaskState} to a json document. * @@ -432,7 +364,7 @@ public TaskExecutionInfo toTaskExecutionInfo() { if (this.endTime > 0) { taskExecutionInfo.setEndTime(this.endTime); } - taskExecutionInfo.setDuration(this.duration); + taskExecutionInfo.setDuration(this.taskDuration); taskExecutionInfo.setState(TaskStateEnum.valueOf(getWorkingState().name())); if (this.contains(ConfigurationKeys.TASK_FAILURE_EXCEPTION_KEY)) { taskExecutionInfo.setFailureException(this.getProp(ConfigurationKeys.TASK_FAILURE_EXCEPTION_KEY)); diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/GobblinOutputCommitter.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/GobblinOutputCommitter.java index 65401c03d65..57ba0936d79 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/GobblinOutputCommitter.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/GobblinOutputCommitter.java @@ -40,7 +40,6 @@ import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.WorkUnitState; -import org.apache.gobblin.runtime.AbstractJobLauncher; import org.apache.gobblin.runtime.GobblinMultiTaskAttempt; import org.apache.gobblin.runtime.listeners.JobListener; import org.apache.gobblin.source.workunit.MultiWorkUnit; @@ -89,28 +88,18 @@ public void abortJob(JobContext jobContext, JobStatus.State state) throws IOExce Closer workUnitFileCloser = Closer.create(); - // If the file ends with ".wu" de-serialize it into a WorkUnit - if (status.getPath().getName().endsWith(AbstractJobLauncher.WORK_UNIT_FILE_EXTENSION)) { - WorkUnit wu = WorkUnit.createEmpty(); - try { - wu.readFields(workUnitFileCloser.register(new DataInputStream(fs.open(status.getPath())))); - } finally { - workUnitFileCloser.close(); - } - JobLauncherUtils.cleanTaskStagingData(new WorkUnitState(wu), LOG); + WorkUnit wu = JobLauncherUtils.createEmptyWorkUnitPerExtension(status.getPath()); + try { + wu.readFields(workUnitFileCloser.register(new DataInputStream(fs.open(status.getPath())))); + } finally { + workUnitFileCloser.close(); } - - // If the file ends with ".mwu" de-serialize it into a MultiWorkUnit - if (status.getPath().getName().endsWith(AbstractJobLauncher.MULTI_WORK_UNIT_FILE_EXTENSION)) { - MultiWorkUnit mwu = MultiWorkUnit.createEmpty(); - try { - mwu.readFields(workUnitFileCloser.register(new DataInputStream(fs.open(status.getPath())))); - } finally { - workUnitFileCloser.close(); - } - for (WorkUnit wu : mwu.getWorkUnits()) { - JobLauncherUtils.cleanTaskStagingData(new WorkUnitState(wu), LOG); + if (wu instanceof MultiWorkUnit) { + for (WorkUnit eachWU : ((MultiWorkUnit) wu).getWorkUnits()) { + JobLauncherUtils.cleanTaskStagingData(new WorkUnitState(eachWU), LOG); } + } else { + JobLauncherUtils.cleanTaskStagingData(new WorkUnitState(wu), LOG); } } } finally { @@ -173,8 +162,7 @@ private static void cleanUpWorkingDirectory(Path mrJobDir, FileSystem fs) throws private static class WorkUnitFilter implements PathFilter { @Override public boolean accept(Path path) { - return path.getName().endsWith(AbstractJobLauncher.WORK_UNIT_FILE_EXTENSION) - || path.getName().endsWith(AbstractJobLauncher.MULTI_WORK_UNIT_FILE_EXTENSION); + return JobLauncherUtils.hasAnyWorkUnitExtension(path); } } } diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java index cd34fe1b5d6..1893cebd71a 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java @@ -25,6 +25,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Properties; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -56,7 +57,6 @@ import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Optional; import com.google.common.base.Splitter; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -523,19 +523,18 @@ private void prepareHadoopJob(List workUnits) throws IOException { mrJobSetupTimer.stop(); } - static boolean isSpeculativeExecutionEnabled(Properties props) { - return Boolean.valueOf( - props.getProperty(JobContext.MAP_SPECULATIVE, ConfigurationKeys.DEFAULT_ENABLE_MR_SPECULATIVE_EXECUTION)); + static boolean isBooleanPropEnabled(Properties props, String propKey, Optional optDefault) { + return (props.containsKey(propKey) && Boolean.parseBoolean(props.getProperty(propKey))) + || (optDefault.isPresent() && optDefault.get()); } - static boolean isCustomizedProgressReportEnabled(Properties properties) { - return properties.containsKey(ENABLED_CUSTOMIZED_PROGRESS) - && Boolean.parseBoolean(properties.getProperty(ENABLED_CUSTOMIZED_PROGRESS)); + static boolean isSpeculativeExecutionEnabled(Properties props) { + return isBooleanPropEnabled(props, JobContext.MAP_SPECULATIVE, + Optional.of(ConfigurationKeys.DEFAULT_ENABLE_MR_SPECULATIVE_EXECUTION)); } - static boolean isBooleanPropEnabled(Properties props, String propKey, Optional optDefault) { - return (props.containsKey(propKey) && Boolean.parseBoolean(props.getProperty(propKey))) - || (optDefault.isPresent() && optDefault.get()); + static boolean isCustomizedProgressReportEnabled(Properties properties) { + return isBooleanPropEnabled(properties, ENABLED_CUSTOMIZED_PROGRESS, Optional.empty()); } static boolean isMapperFailureFatalEnabled(Properties props) { @@ -688,11 +687,11 @@ private void prepareJobInput(List workUnits) throws IOException { for (WorkUnit workUnit : workUnits) { String workUnitFileName; - if (workUnit instanceof MultiWorkUnit) { + if (workUnit.isMultiWorkUnit()) { workUnitFileName = JobLauncherUtils.newMultiTaskId(this.jobContext.getJobId(), multiTaskIdSequence++) - + MULTI_WORK_UNIT_FILE_EXTENSION; + + JobLauncherUtils.MULTI_WORK_UNIT_FILE_EXTENSION; } else { - workUnitFileName = workUnit.getProp(ConfigurationKeys.TASK_ID_KEY) + WORK_UNIT_FILE_EXTENSION; + workUnitFileName = workUnit.getProp(ConfigurationKeys.TASK_ID_KEY) + JobLauncherUtils.WORK_UNIT_FILE_EXTENSION; } Path workUnitFile = new Path(this.jobInputPath, workUnitFileName); LOG.debug("Writing work unit file " + workUnitFileName); @@ -731,7 +730,7 @@ private void cleanUpWorkingDirectory() { */ @VisibleForTesting void countersToMetrics(GobblinMetrics metrics) throws IOException { - Optional counters = Optional.fromNullable(this.job.getCounters()); + Optional counters = Optional.ofNullable(this.job.getCounters()); if (counters.isPresent()) { // Write job-level counters @@ -772,7 +771,7 @@ public static class TaskRunner extends Mapper jobMetrics = Optional.absent(); + private Optional jobMetrics = Optional.empty(); private boolean isSpeculativeEnabled; private boolean customizedProgressEnabled; private final JobState jobState = new JobState(); @@ -809,25 +808,18 @@ protected void setup(Context context) { this.fs = FileSystem.get(context.getConfiguration()); this.taskStateStore = new FsStateStore<>(this.fs, FileOutputFormat.getOutputPath(context).toUri().getPath(), TaskState.class); - String jobStateFileName = context.getConfiguration().get(ConfigurationKeys.JOB_STATE_DISTRIBUTED_CACHE_NAME); - boolean foundStateFile = false; - for (Path dcPath : DistributedCache.getLocalCacheFiles(context.getConfiguration())) { - if (dcPath.getName().equals(jobStateFileName)) { - SerializationUtils.deserializeStateFromInputStream( - closer.register(new FileInputStream(dcPath.toUri().getPath())), this.jobState); - foundStateFile = true; - break; - } - } - if (!foundStateFile) { - throw new IOException("Job state file not found."); + Optional jobStateFileUri = getStateFileUriForJob(context.getConfiguration(), jobStateFileName); + if (jobStateFileUri.isPresent()) { + SerializationUtils.deserializeStateFromInputStream( + closer.register(new FileInputStream(jobStateFileUri.get().getPath())), this.jobState); + } else { + throw new IOException("Job state file not found: '" + jobStateFileName + "'."); } } catch (IOException | ReflectiveOperationException e) { throw new RuntimeException("Failed to setup the mapper task", e); } - // load dynamic configuration to add to the job configuration Configuration configuration = context.getConfiguration(); Config jobStateAsConfig = ConfigUtils.propertiesToConfig(this.jobState.getProperties()); @@ -989,17 +981,17 @@ public void execute() throws IOException { @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { - WorkUnit workUnit = (value.toString().endsWith(MULTI_WORK_UNIT_FILE_EXTENSION) ? MultiWorkUnit.createEmpty() - : WorkUnit.createEmpty()); - SerializationUtils.deserializeState(this.fs, new Path(value.toString()), workUnit); - - if (workUnit instanceof MultiWorkUnit) { - List flattenedWorkUnits = - JobLauncherUtils.flattenWorkUnits(((MultiWorkUnit) workUnit).getWorkUnits()); - this.workUnits.addAll(flattenedWorkUnits); - } else { - this.workUnits.add(workUnit); + this.workUnits.addAll(JobLauncherUtils.loadFlattenedWorkUnits(this.fs, new Path(value.toString()))); + } + + /** @return {@link URI} if a distributed cache file matches `jobStateFileName` */ + protected Optional getStateFileUriForJob(Configuration conf, String jobStateFileName) throws IOException { + for (Path dcPath : DistributedCache.getLocalCacheFiles(conf)) { + if (dcPath.getName().equals(jobStateFileName)) { + return Optional.of(dcPath.toUri()); + } } + return Optional.empty(); } /** diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java index 42fae521e8e..1f20b1b7334 100644 --- a/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java +++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java @@ -55,6 +55,9 @@ @Slf4j public class JobLauncherUtils { + public static final String WORK_UNIT_FILE_EXTENSION = ".wu"; + public static final String MULTI_WORK_UNIT_FILE_EXTENSION = ".mwu"; + // A cache for proxied FileSystems by owners private static Cache fileSystemCacheByOwners = CacheBuilder.newBuilder().build(); @@ -114,7 +117,7 @@ public static String newMultiTaskId(String jobId, int sequence) { public static List flattenWorkUnits(Collection workUnits) { List flattenedWorkUnits = Lists.newArrayList(); for (WorkUnit workUnit : workUnits) { - if (workUnit instanceof MultiWorkUnit) { + if (workUnit.isMultiWorkUnit()) { flattenedWorkUnits.addAll(flattenWorkUnits(((MultiWorkUnit) workUnit).getWorkUnits())); } else { flattenedWorkUnits.add(workUnit); @@ -123,6 +126,34 @@ public static List flattenWorkUnits(Collection workUnits) { return flattenedWorkUnits; } + /** @return flattened list of {@link WorkUnit}s loaded from `path`, which may possibly hold a multi-work unit */ + public static List loadFlattenedWorkUnits(FileSystem fs, Path path) throws IOException { + WorkUnit workUnit = JobLauncherUtils.createEmptyWorkUnitPerExtension(path); + SerializationUtils.deserializeState(fs, path, workUnit); + + if (workUnit.isMultiWorkUnit()) { + return JobLauncherUtils.flattenWorkUnits(((MultiWorkUnit) workUnit).getWorkUnits()); + } else { + return Lists.newArrayList(workUnit); + } + } + + /** @return an empty {@link WorkUnit}, potentially an empty {@link MultiWorkUnit}, based on the {@link Path} extension */ + public static WorkUnit createEmptyWorkUnitPerExtension(Path p) { + return JobLauncherUtils.hasMultiWorkUnitExtension(p) ? MultiWorkUnit.createEmpty() : WorkUnit.createEmpty(); + } + + /** @return whether {@link Path} ends with {@link JobLauncherUtils#MULTI_WORK_UNIT_FILE_EXTENSION} */ + public static boolean hasMultiWorkUnitExtension(Path p) { + return p.getName().endsWith(JobLauncherUtils.MULTI_WORK_UNIT_FILE_EXTENSION); + } + + /** @return whether {@link Path} ends with {@link JobLauncherUtils#MULTI_WORK_UNIT_FILE_EXTENSION} or {@link JobLauncherUtils#WORK_UNIT_FILE_EXTENSION} */ + public static boolean hasAnyWorkUnitExtension(Path p) { + return p.getName().endsWith(JobLauncherUtils.MULTI_WORK_UNIT_FILE_EXTENSION) + || p.getName().endsWith(JobLauncherUtils.WORK_UNIT_FILE_EXTENSION); + } + /** * Cleanup the staging data for a list of Gobblin tasks. This method calls the * {@link #cleanTaskStagingData(State, Logger)} method.