Skip to content

Commit ffa19e1

Browse files
committed
Avoid Adhoc flow spec addition for non leasable entity
1 parent 2afab69 commit ffa19e1

File tree

11 files changed

+157
-6
lines changed

11 files changed

+157
-6
lines changed

gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import org.apache.gobblin.metrics.ServiceMetricNames;
6161
import org.apache.gobblin.runtime.api.FlowSpec;
6262
import org.apache.gobblin.runtime.api.FlowSpecSearchObject;
63+
import org.apache.gobblin.runtime.api.LeaseUnavailableException;
6364
import org.apache.gobblin.runtime.api.SpecNotFoundException;
6465
import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
6566
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
@@ -256,7 +257,10 @@ public CreateKVResponse<ComplexResourceKey<FlowId, FlowStatusId>, FlowConfig> cr
256257
responseMap = this.flowCatalog.put(flowSpec, true);
257258
} catch (QuotaExceededException e) {
258259
throw new RestLiServiceException(HttpStatus.S_503_SERVICE_UNAVAILABLE, e.getMessage());
259-
} catch (Throwable e) {
260+
} catch(LeaseUnavailableException e){
261+
throw new RestLiServiceException(HttpStatus.S_409_CONFLICT, e.getMessage());
262+
}
263+
catch (Throwable e) {
260264
// TODO: Compilation errors should fall under throwable exceptions as well instead of checking for strings
261265
log.warn(String.format("Failed to add flow configuration %s.%s to catalog due to", flowConfig.getId().getFlowGroup(), flowConfig.getId().getFlowName()), e);
262266
throw new RestLiServiceException(HttpStatus.S_500_INTERNAL_SERVER_ERROR, e.getMessage());
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package org.apache.gobblin.runtime.api;
2+
3+
public class LeaseUnavailableException extends RuntimeException {
4+
public LeaseUnavailableException(String message) {
5+
super(message);
6+
}
7+
}

gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java

+5
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,11 @@ public interface DagManagementStateStore {
103103
*/
104104
void updateDagNode(Dag.DagNode<JobExecutionPlan> dagNode) throws IOException;
105105

106+
/**
107+
* Returns true if lease can be acquired on entity, else returns false
108+
*/
109+
boolean existsLeasableEntity(DagActionStore.LeaseParams leaseParams) throws IOException;
110+
106111
/**
107112
* Returns the requested {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} and its {@link JobStatus}.
108113
* Both params are returned as optional and are empty if not present in the store.

gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InstrumentedLeaseArbiter.java

+5
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,11 @@ public LeaseAttemptStatus tryAcquireLease(DagActionStore.LeaseParams leaseParams
9595
leaseAttemptStatus.getClass().getName()));
9696
}
9797

98+
@Override
99+
public boolean existsLeasableEntity(DagActionStore.LeaseParams leaseParams) throws IOException {
100+
return decoratedMultiActiveLeaseArbiter.existsLeasableEntity(leaseParams);
101+
}
102+
98103
@Override
99104
public boolean recordLeaseSuccess(LeaseAttemptStatus.LeaseObtainedStatus status)
100105
throws IOException {

gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java

+3
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,9 @@ public interface MultiActiveLeaseArbiter {
6161
LeaseAttemptStatus tryAcquireLease(DagActionStore.LeaseParams leaseParams, boolean adoptConsensusFlowExecutionId)
6262
throws IOException;
6363

64+
boolean existsLeasableEntity(DagActionStore.LeaseParams leaseParams)
65+
throws IOException;
66+
6467
/**
6568
* This method is used to indicate the owner of the lease has successfully completed required actions while holding
6669
* the lease of the dag action event. It marks the lease as "no longer leasing", if the eventTimeMillis and

gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ public class MySqlDagManagementStateStore implements DagManagementStateStore {
6565
// todo - these two stores should merge
6666
private DagStateStoreWithDagNodes dagStateStore;
6767
private DagStateStoreWithDagNodes failedDagStateStore;
68+
private MultiActiveLeaseArbiter multiActiveLeaseArbiter;
6869
private final JobStatusRetriever jobStatusRetriever;
6970
private boolean dagStoresInitialized = false;
7071
private final UserQuotaManager quotaManager;
@@ -79,13 +80,14 @@ public class MySqlDagManagementStateStore implements DagManagementStateStore {
7980

8081
@Inject
8182
public MySqlDagManagementStateStore(Config config, FlowCatalog flowCatalog, UserQuotaManager userQuotaManager,
82-
JobStatusRetriever jobStatusRetriever, DagActionStore dagActionStore) {
83+
JobStatusRetriever jobStatusRetriever, DagActionStore dagActionStore, MultiActiveLeaseArbiter multiActiveLeaseArbiter) {
8384
this.quotaManager = userQuotaManager;
8485
this.config = config;
8586
this.flowCatalog = flowCatalog;
8687
this.jobStatusRetriever = jobStatusRetriever;
8788
this.dagManagerMetrics.activate();
8889
this.dagActionStore = dagActionStore;
90+
this.multiActiveLeaseArbiter = multiActiveLeaseArbiter;
8991
}
9092

9193
// It should be called after topology spec map is set
@@ -168,6 +170,11 @@ public synchronized void updateDagNode(Dag.DagNode<JobExecutionPlan> dagNode)
168170
this.dagStateStore.updateDagNode(dagNode);
169171
}
170172

173+
@Override
174+
public boolean existsLeasableEntity(DagActionStore.LeaseParams leaseParams) throws IOException {
175+
return multiActiveLeaseArbiter.existsLeasableEntity(leaseParams);
176+
}
177+
171178
@Override
172179
public Optional<Dag<JobExecutionPlan>> getDag(Dag.DagId dagId) throws IOException {
173180
return Optional.ofNullable(this.dagStateStore.getDag(dagId));

gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java

+6
Original file line numberDiff line numberDiff line change
@@ -363,6 +363,12 @@ else if (leaseValidityStatus == 2) {
363363
}
364364
}
365365

366+
@Override
367+
public boolean existsLeasableEntity(DagActionStore.LeaseParams leaseParams) throws IOException {
368+
Optional<GetEventInfoResult> infoResult = getExistingEventInfo(leaseParams);
369+
return infoResult.isPresent() ? !infoResult.get().isWithinEpsilon() : true;
370+
}
371+
366372
/**
367373
* Checks leaseArbiterTable for an existing entry for this dag action and event time
368374
*/

gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java

+31-1
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,16 @@
1717

1818
package org.apache.gobblin.service.modules.orchestration;
1919

20+
import com.linkedin.restli.common.HttpStatus;
21+
import com.linkedin.restli.server.RestLiServiceException;
2022
import java.io.IOException;
2123
import java.net.URI;
2224
import java.util.Collections;
2325
import java.util.List;
2426
import java.util.Properties;
2527
import java.util.concurrent.TimeUnit;
2628

29+
import org.apache.gobblin.runtime.api.LeaseUnavailableException;
2730
import org.slf4j.Logger;
2831
import org.slf4j.LoggerFactory;
2932

@@ -78,6 +81,7 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
7881
protected final SpecCompiler specCompiler;
7982
protected final TopologyCatalog topologyCatalog;
8083
private final JobStatusRetriever jobStatusRetriever;
84+
private final DagManagementStateStore dagManagementStateStore;
8185

8286
protected final MetricContext metricContext;
8387

@@ -100,6 +104,7 @@ public Orchestrator(Config config, TopologyCatalog topologyCatalog, Optional<Log
100104
this.topologyCatalog = topologyCatalog;
101105
this.flowLaunchHandler = flowLaunchHandler;
102106
this.sharedFlowMetricsSingleton = sharedFlowMetricsSingleton;
107+
this.dagManagementStateStore = dagManagementStateStore;
103108
this.jobStatusRetriever = jobStatusRetriever;
104109
this.specCompiler = flowCompilationValidationHelper.getSpecCompiler();
105110
// todo remove the need to set topology factory outside of constructor GOBBLIN-2056
@@ -125,12 +130,37 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
125130
_log.info("New Spec detected of type TopologySpec: " + addedSpec);
126131
this.specCompiler.onAddSpec(addedSpec);
127132
} else if (addedSpec instanceof FlowSpec) {
128-
_log.info("New Spec detected of type FlowSpec: " + addedSpec);
133+
handleFlowSpec((FlowSpec) addedSpec);
129134
return this.specCompiler.onAddSpec(addedSpec);
130135
}
131136
return new AddSpecResponse<>(null);
132137
}
133138

139+
private void handleFlowSpec(FlowSpec flowSpec) {
140+
_log.info(String.format("New Spec detected of type FlowSpec: " + flowSpec));
141+
if (!flowSpec.isScheduled()) {
142+
processFlowSpecForAdhocFlows(flowSpec);
143+
}
144+
this.specCompiler.onAddSpec(flowSpec);
145+
}
146+
147+
private void processFlowSpecForAdhocFlows(FlowSpec flowSpec) {
148+
Config flowConfig = flowSpec.getConfig();
149+
String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
150+
String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
151+
DagActionStore.DagAction dagAction = DagActionStore.DagAction.forFlow(flowGroup, flowName,
152+
FlowUtils.getOrCreateFlowExecutionId(flowSpec), DagActionStore.DagActionType.LAUNCH);
153+
DagActionStore.LeaseParams leaseParams = new DagActionStore.LeaseParams(dagAction, System.currentTimeMillis());
154+
try {
155+
if (!dagManagementStateStore.existsLeasableEntity(leaseParams)) {
156+
throw new LeaseUnavailableException("Lease already occupied by another execution of this flow");
157+
}
158+
} catch (IOException exception) {
159+
_log.error(String.format("Failed to query leaseArbiterTable for existing flow details: %s", flowSpec), exception);
160+
throw new RuntimeException("Error querying leaseArbiterTable", exception);
161+
}
162+
}
163+
134164
public void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion) {
135165
onDeleteSpec(deletedSpecURI, deletedSpecVersion, new Properties());
136166
}

gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java

+15-1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.Map;
2525
import java.util.Set;
2626

27+
import org.mockito.Mockito;
2728
import org.testng.Assert;
2829
import org.testng.annotations.AfterClass;
2930
import org.testng.annotations.BeforeClass;
@@ -59,6 +60,7 @@
5960
public class MySqlDagManagementStateStoreTest {
6061

6162
private ITestMetastoreDatabase testDb;
63+
private MultiActiveLeaseArbiter leaseArbiter;
6264
private MySqlDagManagementStateStore dagManagementStateStore;
6365
private static final String TEST_USER = "testUser";
6466
public static final String TEST_PASSWORD = "testPassword";
@@ -68,6 +70,7 @@ public class MySqlDagManagementStateStoreTest {
6870
@BeforeClass
6971
public void setUp() throws Exception {
7072
// Setting up mock DB
73+
this.leaseArbiter = mock(MultiActiveLeaseArbiter.class);
7174
this.testDb = TestMetastoreDatabaseFactory.get();
7275
this.dagManagementStateStore = getDummyDMSS(this.testDb);
7376
}
@@ -92,6 +95,16 @@ public static <T> boolean compareLists(List<T> list1, List<T> list2) {
9295
return true;
9396
}
9497

98+
@Test
99+
public void testExistsLeasableEntity() throws Exception{
100+
Mockito.when(leaseArbiter.existsLeasableEntity(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(true);
101+
String flowName = "testFlow";
102+
String flowGroup = "testGroup";
103+
DagActionStore.DagAction dagAction = new DagActionStore.DagAction(flowName, flowGroup, System.currentTimeMillis(), "testJob", DagActionStore.DagActionType.LAUNCH);
104+
DagActionStore.LeaseParams leaseParams = new DagActionStore.LeaseParams(dagAction);
105+
leaseArbiter.existsLeasableEntity(leaseParams);
106+
}
107+
95108
@Test
96109
public void testAddDag() throws Exception {
97110
Dag<JobExecutionPlan> dag = DagTestUtils.buildDag("test", 12345L);
@@ -150,9 +163,10 @@ public static MySqlDagManagementStateStore getDummyDMSS(ITestMetastoreDatabase t
150163
TopologySpec topologySpec = LaunchDagProcTest.buildNaiveTopologySpec(TEST_SPEC_EXECUTOR_URI);
151164
URI specExecURI = new URI(TEST_SPEC_EXECUTOR_URI);
152165
topologySpecMap.put(specExecURI, topologySpec);
166+
MultiActiveLeaseArbiter multiActiveLeaseArbiter = Mockito.mock(MultiActiveLeaseArbiter.class);
153167
MySqlDagManagementStateStore dagManagementStateStore =
154168
new MySqlDagManagementStateStore(config, null, null, jobStatusRetriever,
155-
MysqlDagActionStoreTest.getTestDagActionStore(testMetastoreDatabase));
169+
MysqlDagActionStoreTest.getTestDagActionStore(testMetastoreDatabase), multiActiveLeaseArbiter);
156170
dagManagementStateStore.setTopologySpecMap(topologySpecMap);
157171
return dagManagementStateStore;
158172
}

gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java

+28
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
public class MysqlMultiActiveLeaseArbiterTest {
4545
private static final long EPSILON = 10000L;
4646
private static final long MORE_THAN_EPSILON = (long) (EPSILON * 1.1);
47+
private static final long LESS_THAN_EPSILON = (long) (EPSILON * 0.95);
4748
private static final long LINGER = 50000L;
4849
private static final long MORE_THAN_LINGER = (long) (LINGER * 1.1);
4950
private static final String USER = "testUser";
@@ -193,6 +194,33 @@ public void testAcquireLeaseSingleParticipant() throws Exception {
193194
<= sixthObtainedStatus.getLeaseAcquisitionTimestamp());
194195
}
195196

197+
/*
198+
test to verify if leasable entity is unavailable before epsilon time
199+
to account for clock drift
200+
*/
201+
@Test
202+
public void testWhenLeasableEntityUnavailable() throws Exception{
203+
LeaseAttemptStatus firstLaunchStatus =
204+
mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseParams, true);
205+
Assert.assertTrue(firstLaunchStatus instanceof LeaseAttemptStatus.LeaseObtainedStatus);
206+
completeLeaseHelper(launchLeaseParams);
207+
Thread.sleep(LESS_THAN_EPSILON);
208+
Assert.assertFalse(mysqlMultiActiveLeaseArbiter.existsLeasableEntity(launchLeaseParams));
209+
}
210+
211+
/*
212+
test to verify if leasable entity exists post epsilon time
213+
*/
214+
@Test
215+
public void testWhenLeasableEntityAvailable() throws Exception{
216+
LeaseAttemptStatus firstLaunchStatus =
217+
mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseParams2, true);
218+
Assert.assertTrue(firstLaunchStatus instanceof LeaseAttemptStatus.LeaseObtainedStatus);
219+
completeLeaseHelper(launchLeaseParams2);
220+
Thread.sleep(MORE_THAN_EPSILON);
221+
Assert.assertTrue(mysqlMultiActiveLeaseArbiter.existsLeasableEntity(launchLeaseParams2));
222+
}
223+
196224
/*
197225
Tests attemptLeaseIfNewRow() method to ensure a new row is inserted if no row matches the primary key in the table.
198226
If such a row does exist, the method should disregard the resulting SQL error and return 0 rows updated, indicating

gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java

+44-2
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,19 @@
1818
package org.apache.gobblin.service.modules.orchestration;
1919

2020
import java.io.File;
21+
import java.io.IOException;
2122
import java.net.URI;
2223
import java.net.URISyntaxException;
2324
import java.util.Collection;
2425
import java.util.Properties;
2526

2627
import org.apache.commons.io.FileUtils;
28+
import org.apache.gobblin.config.ConfigBuilder;
29+
import org.apache.gobblin.runtime.api.LeaseUnavailableException;
30+
import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
31+
import org.apache.gobblin.service.modules.flow.SpecCompiler;
2732
import org.apache.hadoop.fs.Path;
33+
import org.mockito.Mockito;
2834
import org.slf4j.Logger;
2935
import org.slf4j.LoggerFactory;
3036
import org.testng.Assert;
@@ -86,6 +92,8 @@ public class OrchestratorTest {
8692
private FlowSpec flowSpec;
8793
private ITestMetastoreDatabase testMetastoreDatabase;
8894
private Orchestrator dagMgrNotFlowLaunchHandlerBasedOrchestrator;
95+
private DagManagementStateStore dagManagementStateStore;
96+
private SpecCompiler specCompiler;
8997

9098
@BeforeClass
9199
public void setUpClass() throws Exception {
@@ -107,7 +115,7 @@ public void setUp() throws Exception {
107115
flowProperties.put("specStore.fs.dir", FLOW_SPEC_STORE_DIR);
108116

109117
this.serviceLauncher = new ServiceBasedAppLauncher(orchestratorProperties, "OrchestratorCatalogTest");
110-
118+
this.specCompiler = Mockito.mock(SpecCompiler.class);
111119
this.topologyCatalog = new TopologyCatalog(ConfigUtils.propertiesToConfig(topologyProperties),
112120
Optional.of(logger));
113121
this.serviceLauncher.addService(topologyCatalog);
@@ -116,9 +124,10 @@ public void setUp() throws Exception {
116124
this.flowCatalog = new FlowCatalog(ConfigUtils.propertiesToConfig(flowProperties), Optional.of(logger), Optional.absent(), true);
117125

118126
this.serviceLauncher.addService(flowCatalog);
119-
127+
MultiActiveLeaseArbiter leaseArbiter = Mockito.mock(MultiActiveLeaseArbiter.class);
120128
MySqlDagManagementStateStore dagManagementStateStore =
121129
spy(MySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
130+
this.dagManagementStateStore=dagManagementStateStore;
122131

123132
SharedFlowMetricsSingleton sharedFlowMetricsSingleton = new SharedFlowMetricsSingleton(ConfigUtils.propertiesToConfig(orchestratorProperties));
124133

@@ -311,6 +320,39 @@ public void createFlowSpec() throws Throwable {
311320
"SpecProducer should contain 0 Spec after addition");
312321
}
313322

323+
/*
324+
If another flow has already acquired lease for this flowspec details within
325+
epsilon time, then we do not execute this flow, hence do not process and store the spec
326+
and throw LeaseUnavailableException
327+
*/
328+
@Test(expectedExceptions = LeaseUnavailableException.class)
329+
public void testOnAddSpec_withFlowSpec_leaseUnavailable() throws IOException {
330+
ConfigBuilder configBuilder = ConfigBuilder.create()
331+
.addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "testGroup")
332+
.addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "testName");
333+
Config config = configBuilder.build();
334+
FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build();
335+
Mockito.when(dagManagementStateStore.existsLeasableEntity(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(false);
336+
dagMgrNotFlowLaunchHandlerBasedOrchestrator.onAddSpec(flowSpec);
337+
}
338+
339+
@Test
340+
public void testOnAddSpec_withFlowSpec_Available() throws IOException {
341+
ConfigBuilder configBuilder = ConfigBuilder.create()
342+
.addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "testGroup")
343+
.addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "testName")
344+
.addPrimitive(ConfigurationKeys.JOB_SCHEDULE_KEY, "0 1/0 * ? * *")
345+
.addPrimitive("gobblin.flow.sourceIdentifier", "source")
346+
.addPrimitive("gobblin.flow.destinationIdentifier", "destination");
347+
Config config = configBuilder.build();
348+
FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build();
349+
Mockito.when(dagManagementStateStore.existsLeasableEntity(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(true);
350+
AddSpecResponse response = new AddSpecResponse<>(new Object());
351+
Mockito.when(specCompiler.onAddSpec(flowSpec)).thenReturn(response);
352+
AddSpecResponse addSpecResponse = dagMgrNotFlowLaunchHandlerBasedOrchestrator.onAddSpec(flowSpec);
353+
Assert.assertNotNull(addSpecResponse);
354+
}
355+
314356
@Test
315357
public void deleteFlowSpec() throws Throwable {
316358
// TODO: fix this lingering inter-test dep from when `@BeforeClass` init, which we've since replaced by `Mockito.verify`-friendly `@BeforeMethod`

0 commit comments

Comments
 (0)