diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java index c0578a99703..cf324b44342 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java @@ -462,9 +462,13 @@ public void launchJob(@Nullable JobListener jobListener) throws JobException { } // TODO: Better error handling. The current impl swallows exceptions for jobs that were started by this method call. - // One potential way to improve the error handling is to make this error swallowing conifgurable + // One potential way to improve the error handling is to make this error swallowing configurable } catch (Throwable t) { errorInJobLaunching = t; + if (isLaunched) { + // Attempts to cancel the helix workflow if an error occurs during launch + cancelJob(jobListener); + } } finally { if (isLaunched) { if (this.runningMap.replace(this.jobContext.getJobName(), true, false)) { diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java index 975dad38b79..d475688e5a0 100644 --- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java @@ -38,6 +38,7 @@ import org.apache.helix.task.TaskDriver; import org.apache.helix.task.WorkflowConfig; import org.apache.helix.task.WorkflowContext; +import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -63,6 +64,7 @@ import org.apache.gobblin.runtime.JobException; import org.apache.gobblin.runtime.JobState; import org.apache.gobblin.runtime.listeners.AbstractJobListener; +import org.apache.gobblin.runtime.listeners.JobListener; import org.apache.gobblin.util.ClassAliasResolver; import org.apache.gobblin.util.ConfigUtils; @@ -299,6 +301,35 @@ public void testTimeout() throws Exception { Assert.assertThrows(JobException.class, () -> gobblinHelixJobLauncher.launchJobImpl(null)); } + public void testCancelJobOnFailureDuringLaunch() throws Exception { + final ConcurrentHashMap runningMap = new ConcurrentHashMap<>(); + final Properties props = generateJobProperties(this.baseConfig, "testDoesCancelOnFailure", "_12345"); + props.setProperty(GobblinClusterConfigurationKeys.HELIX_WORKFLOW_SUBMISSION_TIMEOUT_SECONDS, "0"); + + final GobblinHelixJobLauncher gobblinHelixJobLauncher = this.closer.register( + new GobblinHelixJobLauncher(props, this.helixManager, this.appWorkDir, ImmutableList.>of(), runningMap, + java.util.Optional.empty())); + + // The launchJob will throw an exception (see testTimeout test) and we expect the launcher to swallow the exception, + // then call still properly call cancel. We use the listener to confirm the cancel hook was correctly called once + JobListener mockListener = Mockito.mock(JobListener.class); + gobblinHelixJobLauncher.launchJob(mockListener); + Mockito.verify(mockListener).onJobCancellation(Mockito.any(JobContext.class)); + } + + public void testNoCancelWhenJobCompletesSuccessfully() throws Exception { + final ConcurrentHashMap runningMap = new ConcurrentHashMap<>(); + final Properties props = generateJobProperties(this.baseConfig, "testDoesNotCancelOnSuccess", "_12345"); + final GobblinHelixJobLauncher gobblinHelixJobLauncher = this.closer.register( + new GobblinHelixJobLauncher(props, this.helixManager, this.appWorkDir, ImmutableList.>of(), runningMap, + java.util.Optional.empty())); + + // When the job finishes successfully, the cancellation hook should not be invoked + JobListener mockListener = Mockito.mock(JobListener.class); + gobblinHelixJobLauncher.launchJob(mockListener); + Mockito.verify(mockListener, Mockito.never()).onJobCancellation(Mockito.any(JobContext.class)); + } + @Test(enabled = false, dependsOnMethods = {"testLaunchJob", "testLaunchMultipleJobs"}) public void testJobCleanup() throws Exception { final ConcurrentHashMap runningMap = new ConcurrentHashMap<>();