diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java index 927909e57dd..ff57745c7e0 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java @@ -60,6 +60,7 @@ import org.apache.gobblin.metrics.ServiceMetricNames; import org.apache.gobblin.runtime.api.FlowSpec; import org.apache.gobblin.runtime.api.FlowSpecSearchObject; +import org.apache.gobblin.runtime.api.LeaseUnavailableException; import org.apache.gobblin.runtime.api.SpecNotFoundException; import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse; import org.apache.gobblin.runtime.spec_catalog.FlowCatalog; @@ -256,7 +257,10 @@ public CreateKVResponse, FlowConfig> cr responseMap = this.flowCatalog.put(flowSpec, true); } catch (QuotaExceededException e) { throw new RestLiServiceException(HttpStatus.S_503_SERVICE_UNAVAILABLE, e.getMessage()); - } catch (Throwable e) { + } catch(LeaseUnavailableException e){ + throw new RestLiServiceException(HttpStatus.S_409_CONFLICT, e.getMessage()); + } + catch (Throwable e) { // TODO: Compilation errors should fall under throwable exceptions as well instead of checking for strings log.warn(String.format("Failed to add flow configuration %s.%s to catalog due to", flowConfig.getId().getFlowGroup(), flowConfig.getId().getFlowName()), e); throw new RestLiServiceException(HttpStatus.S_500_INTERNAL_SERVER_ERROR, e.getMessage()); diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/LeaseUnavailableException.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/LeaseUnavailableException.java new file mode 100644 index 00000000000..8bdf92b8f63 --- /dev/null +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/LeaseUnavailableException.java @@ -0,0 +1,7 @@ +package org.apache.gobblin.runtime.api; + +public class LeaseUnavailableException extends RuntimeException { + public LeaseUnavailableException(String message) { + super(message); + } +} diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java index fb7b23fdf0a..8fe0eac9f01 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java @@ -103,6 +103,11 @@ public interface DagManagementStateStore { */ void updateDagNode(Dag.DagNode dagNode) throws IOException; + /** + * Returns true if lease can be acquired on entity, else returns false + */ + boolean existsLeasableEntity(DagActionStore.LeaseParams leaseParams) throws IOException; + /** * Returns the requested {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} and its {@link JobStatus}. * Both params are returned as optional and are empty if not present in the store. diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InstrumentedLeaseArbiter.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InstrumentedLeaseArbiter.java index 9e1c270c493..67042f75955 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InstrumentedLeaseArbiter.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InstrumentedLeaseArbiter.java @@ -90,6 +90,11 @@ public LeaseAttemptStatus tryAcquireLease(DagActionStore.LeaseParams leaseParams throw new RuntimeException(String.format("Unexpected LeaseAttemptStatus (%s) for %s", leaseAttemptStatus.getClass().getName(), leaseParams)); } + @Override + public boolean existsLeasableEntity(DagActionStore.LeaseParams leaseParams) throws IOException { + return decoratedMultiActiveLeaseArbiter.existsLeasableEntity(leaseParams); + } + @Override public boolean recordLeaseSuccess(LeaseAttemptStatus.LeaseObtainedStatus status) throws IOException { diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java index c9a3b152bf8..2da76f29249 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java @@ -61,6 +61,9 @@ public interface MultiActiveLeaseArbiter { LeaseAttemptStatus tryAcquireLease(DagActionStore.LeaseParams leaseParams, boolean adoptConsensusFlowExecutionId) throws IOException; + boolean existsLeasableEntity(DagActionStore.LeaseParams leaseParams) + throws IOException; + /** * This method is used to indicate the owner of the lease has successfully completed required actions while holding * the lease of the dag action event. It marks the lease as "no longer leasing", if the eventTimeMillis and diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java index 29e652cce8e..cef538570a7 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java @@ -65,6 +65,7 @@ public class MySqlDagManagementStateStore implements DagManagementStateStore { // todo - these two stores should merge private DagStateStoreWithDagNodes dagStateStore; private DagStateStoreWithDagNodes failedDagStateStore; + private MultiActiveLeaseArbiter multiActiveLeaseArbiter; private final JobStatusRetriever jobStatusRetriever; private boolean dagStoresInitialized = false; private final UserQuotaManager quotaManager; @@ -79,13 +80,14 @@ public class MySqlDagManagementStateStore implements DagManagementStateStore { @Inject public MySqlDagManagementStateStore(Config config, FlowCatalog flowCatalog, UserQuotaManager userQuotaManager, - JobStatusRetriever jobStatusRetriever, DagActionStore dagActionStore) { + JobStatusRetriever jobStatusRetriever, DagActionStore dagActionStore, MultiActiveLeaseArbiter multiActiveLeaseArbiter) { this.quotaManager = userQuotaManager; this.config = config; this.flowCatalog = flowCatalog; this.jobStatusRetriever = jobStatusRetriever; this.dagManagerMetrics.activate(); this.dagActionStore = dagActionStore; + this.multiActiveLeaseArbiter = multiActiveLeaseArbiter; } // It should be called after topology spec map is set @@ -168,6 +170,11 @@ public synchronized void updateDagNode(Dag.DagNode dagNode) this.dagStateStore.updateDagNode(dagNode); } + @Override + public boolean existsLeasableEntity(DagActionStore.LeaseParams leaseParams) throws IOException { + return multiActiveLeaseArbiter.existsLeasableEntity(leaseParams); + } + @Override public Optional> getDag(Dag.DagId dagId) throws IOException { return Optional.ofNullable(this.dagStateStore.getDag(dagId)); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java index 48112790481..8cb930f1dd4 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java @@ -362,6 +362,12 @@ else if (leaseValidityStatus == 2) { } } + @Override + public boolean existsLeasableEntity(DagActionStore.LeaseParams leaseParams) throws IOException { + Optional infoResult = getExistingEventInfo(leaseParams); + return infoResult.isPresent() ? !infoResult.get().isWithinEpsilon() : true; + } + /** * Checks leaseArbiterTable for an existing entry for this dag action and event time */ diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java index ae053ab51b4..5dbc6e8f3a6 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java @@ -17,6 +17,8 @@ package org.apache.gobblin.service.modules.orchestration; +import com.linkedin.restli.common.HttpStatus; +import com.linkedin.restli.server.RestLiServiceException; import java.io.IOException; import java.net.URI; import java.util.Collections; @@ -24,6 +26,7 @@ import java.util.Properties; import java.util.concurrent.TimeUnit; +import org.apache.gobblin.runtime.api.LeaseUnavailableException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,6 +81,7 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable { protected final SpecCompiler specCompiler; protected final TopologyCatalog topologyCatalog; private final JobStatusRetriever jobStatusRetriever; + private final DagManagementStateStore dagManagementStateStore; protected final MetricContext metricContext; @@ -100,6 +104,7 @@ public Orchestrator(Config config, TopologyCatalog topologyCatalog, Optional(null); } + private void validateAdhocFlowLeasability(FlowSpec flowSpec) { + if (!flowSpec.isScheduled()) { + Config flowConfig = flowSpec.getConfig(); + String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY); + String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY); + DagActionStore.DagAction dagAction = DagActionStore.DagAction.forFlow(flowGroup, flowName, + FlowUtils.getOrCreateFlowExecutionId(flowSpec), DagActionStore.DagActionType.LAUNCH); + DagActionStore.LeaseParams leaseParams = new DagActionStore.LeaseParams(dagAction, System.currentTimeMillis()); + try { + if (!dagManagementStateStore.existsLeasableEntity(leaseParams)) { + throw new LeaseUnavailableException("Lease already occupied by another execution of this flow"); + } + } catch (IOException exception) { + _log.error(String.format("Failed to query leaseArbiterTable for existing flow details: %s", flowSpec), exception); + throw new RuntimeException("Error querying leaseArbiterTable", exception); + } + } + } + public void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion) { onDeleteSpec(deletedSpecURI, deletedSpecVersion, new Properties()); } diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java index c14a7b62386..cba69630957 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.Set; +import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -59,6 +60,7 @@ public class MySqlDagManagementStateStoreTest { private ITestMetastoreDatabase testDb; + private static MultiActiveLeaseArbiter leaseArbiter; private MySqlDagManagementStateStore dagManagementStateStore; private static final String TEST_USER = "testUser"; public static final String TEST_PASSWORD = "testPassword"; @@ -68,6 +70,7 @@ public class MySqlDagManagementStateStoreTest { @BeforeClass public void setUp() throws Exception { // Setting up mock DB + this.leaseArbiter = mock(MultiActiveLeaseArbiter.class); this.testDb = TestMetastoreDatabaseFactory.get(); this.dagManagementStateStore = getDummyDMSS(this.testDb); } @@ -92,6 +95,16 @@ public static boolean compareLists(List list1, List list2) { return true; } + @Test + public void testExistsLeasableEntity() throws Exception{ + Mockito.when(leaseArbiter.existsLeasableEntity(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(true); + String flowName = "testFlow"; + String flowGroup = "testGroup"; + DagActionStore.DagAction dagAction = new DagActionStore.DagAction(flowName, flowGroup, System.currentTimeMillis(), "testJob", DagActionStore.DagActionType.LAUNCH); + DagActionStore.LeaseParams leaseParams = new DagActionStore.LeaseParams(dagAction); + Assert.assertTrue(dagManagementStateStore.existsLeasableEntity(leaseParams)); + } + @Test public void testAddDag() throws Exception { Dag dag = DagTestUtils.buildDag("test", 12345L); @@ -150,9 +163,11 @@ public static MySqlDagManagementStateStore getDummyDMSS(ITestMetastoreDatabase t TopologySpec topologySpec = LaunchDagProcTest.buildNaiveTopologySpec(TEST_SPEC_EXECUTOR_URI); URI specExecURI = new URI(TEST_SPEC_EXECUTOR_URI); topologySpecMap.put(specExecURI, topologySpec); + MultiActiveLeaseArbiter multiActiveLeaseArbiter = Mockito.mock(MultiActiveLeaseArbiter.class); + leaseArbiter = multiActiveLeaseArbiter; MySqlDagManagementStateStore dagManagementStateStore = new MySqlDagManagementStateStore(config, null, null, jobStatusRetriever, - MysqlDagActionStoreTest.getTestDagActionStore(testMetastoreDatabase)); + MysqlDagActionStoreTest.getTestDagActionStore(testMetastoreDatabase), multiActiveLeaseArbiter); dagManagementStateStore.setTopologySpecMap(topologySpecMap); return dagManagementStateStore; } diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java index 9b132fe0d9b..0553f985b57 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java @@ -44,6 +44,7 @@ public class MysqlMultiActiveLeaseArbiterTest { private static final long EPSILON = 10000L; private static final long MORE_THAN_EPSILON = (long) (EPSILON * 1.1); + private static final long LESS_THAN_EPSILON = (long) (EPSILON * 0.90); // NOTE: `sleep`ing this long SIGNIFICANTLY slows tests, but we need a large enough value that exec. variability won't cause spurious failure private static final long LINGER = 20000L; private static final long MORE_THAN_LINGER = (long) (LINGER * 1.1); @@ -53,6 +54,8 @@ public class MysqlMultiActiveLeaseArbiterTest { private static final String CONSTANTS_TABLE = "constants_store"; private static final String flowGroup = "testFlowGroup"; private static final String flowGroup2 = "testFlowGroup2"; + private static final String flowGroup3 = "testFlowGroup3"; + private static final String flowGroup4 = "testFlowGroup4"; private static final String flowName = "testFlowName"; private static final String jobName = "testJobName"; private static final long flowExecutionId = 12345677L; @@ -70,6 +73,14 @@ public class MysqlMultiActiveLeaseArbiterTest { new DagActionStore.DagAction(flowGroup2, flowName, flowExecutionId, jobName, DagActionStore.DagActionType.LAUNCH); private static final DagActionStore.LeaseParams launchLeaseParams2 = new DagActionStore.LeaseParams(launchDagAction2, false, eventTimeMillis); + private static final DagActionStore.DagAction launchDagAction3 = + new DagActionStore.DagAction(flowGroup3, flowName, flowExecutionId, jobName, DagActionStore.DagActionType.LAUNCH); + private static final DagActionStore.LeaseParams + launchLeaseParams3 = new DagActionStore.LeaseParams(launchDagAction3, false, eventTimeMillis); + private static final DagActionStore.DagAction launchDagAction4 = + new DagActionStore.DagAction(flowGroup4, flowName, flowExecutionId, jobName, DagActionStore.DagActionType.LAUNCH); + private static final DagActionStore.LeaseParams + launchLeaseParams4 = new DagActionStore.LeaseParams(launchDagAction4, false, eventTimeMillis); private static final Timestamp dummyTimestamp = new Timestamp(99999); private ITestMetastoreDatabase testDb; private MysqlMultiActiveLeaseArbiter mysqlMultiActiveLeaseArbiter; @@ -201,6 +212,33 @@ public void testAcquireLeaseSingleParticipant() throws Exception { <= sixthObtainedStatus.getLeaseAcquisitionTimestamp()); } + /* + test to verify if leasable entity is unavailable before epsilon time + to account for clock drift + */ + @Test + public void testWhenLeasableEntityUnavailable() throws Exception{ + LeaseAttemptStatus firstLaunchStatus = + mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseParams3, true); + Assert.assertTrue(firstLaunchStatus instanceof LeaseAttemptStatus.LeaseObtainedStatus); + completeLeaseHelper(launchLeaseParams3); + Thread.sleep(LESS_THAN_EPSILON); + Assert.assertFalse(mysqlMultiActiveLeaseArbiter.existsLeasableEntity(launchLeaseParams3)); + } + + /* + test to verify if leasable entity exists post epsilon time + */ + @Test + public void testWhenLeasableEntityAvailable() throws Exception{ + LeaseAttemptStatus firstLaunchStatus = + mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseParams4, true); + Assert.assertTrue(firstLaunchStatus instanceof LeaseAttemptStatus.LeaseObtainedStatus); + completeLeaseHelper(launchLeaseParams4); + Thread.sleep(MORE_THAN_EPSILON); + Assert.assertTrue(mysqlMultiActiveLeaseArbiter.existsLeasableEntity(launchLeaseParams4)); + } + /* Tests attemptLeaseIfNewRow() method to ensure a new row is inserted if no row matches the primary key in the table. If such a row does exist, the method should disregard the resulting SQL error and return 0 rows updated, indicating diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java index ee5f14cb873..72364d7d580 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java @@ -18,13 +18,19 @@ package org.apache.gobblin.service.modules.orchestration; import java.io.File; +import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.Collection; import java.util.Properties; import org.apache.commons.io.FileUtils; +import org.apache.gobblin.config.ConfigBuilder; +import org.apache.gobblin.runtime.api.LeaseUnavailableException; +import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse; +import org.apache.gobblin.service.modules.flow.SpecCompiler; import org.apache.hadoop.fs.Path; +import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -86,6 +92,8 @@ public class OrchestratorTest { private FlowSpec flowSpec; private ITestMetastoreDatabase testMetastoreDatabase; private Orchestrator dagMgrNotFlowLaunchHandlerBasedOrchestrator; + private DagManagementStateStore dagManagementStateStore; + private SpecCompiler specCompiler; @BeforeClass public void setUpClass() throws Exception { @@ -107,7 +115,7 @@ public void setUp() throws Exception { flowProperties.put("specStore.fs.dir", FLOW_SPEC_STORE_DIR); this.serviceLauncher = new ServiceBasedAppLauncher(orchestratorProperties, "OrchestratorCatalogTest"); - + this.specCompiler = Mockito.mock(SpecCompiler.class); this.topologyCatalog = new TopologyCatalog(ConfigUtils.propertiesToConfig(topologyProperties), Optional.of(logger)); this.serviceLauncher.addService(topologyCatalog); @@ -116,9 +124,10 @@ public void setUp() throws Exception { this.flowCatalog = new FlowCatalog(ConfigUtils.propertiesToConfig(flowProperties), Optional.of(logger), Optional.absent(), true); this.serviceLauncher.addService(flowCatalog); - + MultiActiveLeaseArbiter leaseArbiter = Mockito.mock(MultiActiveLeaseArbiter.class); MySqlDagManagementStateStore dagManagementStateStore = spy(MySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase)); + this.dagManagementStateStore=dagManagementStateStore; SharedFlowMetricsSingleton sharedFlowMetricsSingleton = new SharedFlowMetricsSingleton(ConfigUtils.propertiesToConfig(orchestratorProperties)); @@ -311,6 +320,39 @@ public void createFlowSpec() throws Throwable { "SpecProducer should contain 0 Spec after addition"); } + /* + If another flow has already acquired lease for this flowspec details within + epsilon time, then we do not execute this flow, hence do not process and store the spec + and throw LeaseUnavailableException + */ + @Test(expectedExceptions = LeaseUnavailableException.class) + public void testOnAddSpec_withFlowSpec_leaseUnavailable() throws IOException { + ConfigBuilder configBuilder = ConfigBuilder.create() + .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "testGroup") + .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "testName"); + Config config = configBuilder.build(); + FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build(); + Mockito.when(dagManagementStateStore.existsLeasableEntity(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(false); + dagMgrNotFlowLaunchHandlerBasedOrchestrator.onAddSpec(flowSpec); + } + + @Test + public void testOnAddSpec_withFlowSpec_Available() throws IOException { + ConfigBuilder configBuilder = ConfigBuilder.create() + .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "testGroup") + .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "testName") + .addPrimitive(ConfigurationKeys.JOB_SCHEDULE_KEY, "0 1/0 * ? * *") + .addPrimitive("gobblin.flow.sourceIdentifier", "source") + .addPrimitive("gobblin.flow.destinationIdentifier", "destination"); + Config config = configBuilder.build(); + FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build(); + Mockito.when(dagManagementStateStore.existsLeasableEntity(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(true); + AddSpecResponse response = new AddSpecResponse<>(new Object()); + Mockito.when(specCompiler.onAddSpec(flowSpec)).thenReturn(response); + AddSpecResponse addSpecResponse = dagMgrNotFlowLaunchHandlerBasedOrchestrator.onAddSpec(flowSpec); + Assert.assertNotNull(addSpecResponse); + } + @Test public void deleteFlowSpec() throws Throwable { // TODO: fix this lingering inter-test dep from when `@BeforeClass` init, which we've since replaced by `Mockito.verify`-friendly `@BeforeMethod`