Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,8 @@ public class ConfigurationKeys {
public static final String DEFAULT_FORK_OPERATOR_CLASS = "org.apache.gobblin.fork.IdentityForkOperator";
public static final String JOB_COMMIT_POLICY_KEY = "job.commit.policy";
public static final String DEFAULT_JOB_COMMIT_POLICY = "full";

public static final String PARTIAL_FAIL_TASK_FAILS_JOB_COMMIT = "job.commit.partial.fail.task.fails.job.commit";
// If true, commit of different datasets will be performed in parallel
// only turn on if publisher is thread-safe
public static final String PARALLELIZE_DATASET_COMMIT = "job.commit.parallelize";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ public class JobContext implements Closeable {
private final JobState jobState;
@Getter(AccessLevel.PACKAGE)
private final JobCommitPolicy jobCommitPolicy;
// A job commit semantic where we want partially successful tasks to commit their data, but still fail the job
// WARNING: this is for Gobblin jobs that do not record their watermark, hence this would not lead to duplicate work
@Getter(AccessLevel.PACKAGE)
private final boolean partialFailTaskFailsJobCommit;

private final Optional<JobMetrics> jobMetricsOptional;
private final Source<?, ?> source;

Expand Down Expand Up @@ -146,6 +151,7 @@ public JobContext(Properties jobProps, Logger logger, SharedResourcesBroker<Gobb
this.jobBroker = instanceBroker.newSubscopedBuilder(new JobScopeInstance(this.jobName, this.jobId))
.withOverridingConfig(ConfigUtils.propertiesToConfig(jobProps)).build();
this.jobCommitPolicy = JobCommitPolicy.getCommitPolicy(jobProps);
this.partialFailTaskFailsJobCommit = Boolean.valueOf(jobProps.getProperty(ConfigurationKeys.PARTIAL_FAIL_TASK_FAILS_JOB_COMMIT, "false"));

this.datasetStateStore = createStateStore(ConfigUtils.propertiesToConfig(jobProps));
this.jobHistoryStoreOptional = createJobHistoryStore(jobProps);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ private void finalizeDatasetState(JobState.DatasetState datasetState, String dat
// Backoff the actual high watermark to the low watermark for each task that has not been committed
if (taskState.getWorkingState() != WorkUnitState.WorkingState.COMMITTED) {
taskState.backoffActualHighWatermark();
if (this.jobContext.getJobCommitPolicy() == JobCommitPolicy.COMMIT_ON_FULL_SUCCESS) {
if (this.jobContext.getJobCommitPolicy() == JobCommitPolicy.COMMIT_ON_FULL_SUCCESS || this.jobContext.isPartialFailTaskFailsJobCommit()) {
// Determine the final dataset state based on the task states (post commit) and the job commit policy.
// 1. If COMMIT_ON_FULL_SUCCESS is used, the processing of the dataset is considered failed if any
// task for the dataset failed to be committed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,40 @@ public void runTestWithCommitSuccessfulTasksPolicy(Properties jobProps) throws E
}
}

public void runTestWithCommitSuccessfulTasksPolicyAndFailJob(Properties jobProps) throws Exception {
String jobName = jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY);
String jobId = JobLauncherUtils.newJobId(jobName).toString();
jobProps.setProperty(ConfigurationKeys.JOB_ID_KEY, jobId);
jobProps.setProperty(ConfigurationKeys.PUBLISH_DATA_AT_JOB_LEVEL, Boolean.FALSE.toString());
jobProps.setProperty(ConfigurationKeys.JOB_COMMIT_POLICY_KEY, "successful");
jobProps.setProperty(ConfigurationKeys.SOURCE_CLASS_KEY, TestSourceWithFaultyExtractor.class.getName());
jobProps.setProperty(ConfigurationKeys.MAX_TASK_RETRIES_KEY, "0");
jobProps.setProperty(ConfigurationKeys.PARTIAL_FAIL_TASK_FAILS_JOB_COMMIT, "true");

Closer closer = Closer.create();
try {
JobLauncher jobLauncher = closer.register(JobLauncherFactory.newJobLauncher(this.launcherProps, jobProps));
jobLauncher.launchJob(null);
} catch (JobException e) {
List<JobState.DatasetState> datasetStateList = this.datasetStateStore.getAll(jobName, sanitizeJobNameForDatasetStore(jobId) + ".jst");
JobState jobState = datasetStateList.get(0);

Assert.assertEquals(jobState.getState(), JobState.RunningState.FAILED);
Assert.assertEquals(jobState.getCompletedTasks(), 4);
for (TaskState taskState : jobState.getTaskStates()) {
if (taskState.getTaskId().endsWith("0")) {
Assert.assertEquals(taskState.getWorkingState(), WorkUnitState.WorkingState.FAILED);
} else {
Assert.assertEquals(taskState.getWorkingState(), WorkUnitState.WorkingState.COMMITTED);
Assert.assertEquals(taskState.getPropAsLong(ConfigurationKeys.WRITER_RECORDS_WRITTEN),
TestExtractor.TOTAL_RECORDS);
}
}
} finally {
closer.close();
}
}

public void runTestWithMultipleDatasetsAndFaultyExtractor(Properties jobProps, boolean usePartialCommitPolicy)
throws Exception {
String jobName = jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,19 @@ public void testLaunchJobWithCommitSuccessfulTasksPolicy() throws Exception {
}
}

@Test
public void testLaunchJobWithCommitSuccessfulTasksPolicyAndFailJob() throws Exception {
Properties jobProps = loadJobProps();
jobProps.setProperty(ConfigurationKeys.JOB_NAME_KEY,
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY) + "-testLaunchJobWithCommitSuccessfulTasksPolicyAndFailJob");
try {
this.jobLauncherTestHelper.runTestWithCommitSuccessfulTasksPolicyAndFailJob(jobProps);
} finally {
this.jobLauncherTestHelper.deleteStateStore(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
}
}


@Test
public void testLaunchJobWithMultipleDatasetsAndFaultyExtractor() throws Exception {
Properties jobProps = loadJobProps();
Expand Down