Skip to content

Commit 2ca43c7

Browse files
committed
Update unit tests
1 parent 3fa68a1 commit 2ca43c7

File tree

9 files changed

+49
-19
lines changed

9 files changed

+49
-19
lines changed

gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ public interface DagManagementStateStore {
108108
* @param leaseParams uniquely identifies the flow, the present action upon it, the time the action
109109
* was triggered, and if the dag action event we're checking on is a reminder event
110110
*/
111-
boolean canAcquireLeaseOnEntity(DagActionStore.LeaseParams leaseParams) throws IOException;
111+
boolean isLeaseAcquirable(DagActionStore.LeaseParams leaseParams) throws IOException;
112112

113113
/**
114114
* Returns the requested {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} and its {@link JobStatus}.

gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InstrumentedLeaseArbiter.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,8 @@ public LeaseAttemptStatus tryAcquireLease(DagActionStore.LeaseParams leaseParams
9191
}
9292

9393
@Override
94-
public boolean canAcquireLeaseOnEntity(DagActionStore.LeaseParams leaseParams) throws IOException {
95-
return decoratedMultiActiveLeaseArbiter.canAcquireLeaseOnEntity(leaseParams);
94+
public boolean isLeaseAcquirable(DagActionStore.LeaseParams leaseParams) throws IOException {
95+
return decoratedMultiActiveLeaseArbiter.isLeaseAcquirable(leaseParams);
9696
}
9797

9898
@Override

gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -64,11 +64,12 @@ LeaseAttemptStatus tryAcquireLease(DagActionStore.LeaseParams leaseParams, boole
6464
/**
6565
* This method checks if lease can be acquired on provided flow in lease params
6666
* returns true if entry for the same flow does not exists within epsilon time
67-
* in leaseArbiterStore
67+
* in leaseArbiterStore, else returns false
6868
* @param leaseParams uniquely identifies the flow, the present action upon it, the time the action
6969
* was triggered, and if the dag action event we're checking on is a reminder event
70+
* @return true if lease can be acquired on the flow passed in the lease params, false otherwise
7071
*/
71-
boolean canAcquireLeaseOnEntity(DagActionStore.LeaseParams leaseParams)
72+
boolean isLeaseAcquirable(DagActionStore.LeaseParams leaseParams)
7273
throws IOException;
7374

7475
/**

gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -171,8 +171,8 @@ public synchronized void updateDagNode(Dag.DagNode<JobExecutionPlan> dagNode)
171171
}
172172

173173
@Override
174-
public boolean canAcquireLeaseOnEntity(DagActionStore.LeaseParams leaseParams) throws IOException {
175-
return multiActiveLeaseArbiter.canAcquireLeaseOnEntity(leaseParams);
174+
public boolean isLeaseAcquirable(DagActionStore.LeaseParams leaseParams) throws IOException {
175+
return multiActiveLeaseArbiter.isLeaseAcquirable(leaseParams);
176176
}
177177

178178
@Override

gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -362,8 +362,12 @@ else if (leaseValidityStatus == 2) {
362362
}
363363
}
364364

365+
/*
366+
Determines if a lease can be acquired for the given flow. A lease is acquirable if
367+
no existing lease record exists in arbiter table or the record is older then epsilon time
368+
*/
365369
@Override
366-
public boolean canAcquireLeaseOnEntity(DagActionStore.LeaseParams leaseParams) throws IOException {
370+
public boolean isLeaseAcquirable(DagActionStore.LeaseParams leaseParams) throws IOException {
367371
Optional<GetEventInfoResult> infoResult = getExistingEventInfo(leaseParams);
368372
return infoResult.isPresent() ? !infoResult.get().isWithinEpsilon() : true;
369373
}

gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java

+7-3
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
package org.apache.gobblin.service.modules.orchestration;
1919

20-
import com.linkedin.restli.common.HttpStatus;
21-
import com.linkedin.restli.server.RestLiServiceException;
2220
import java.io.IOException;
2321
import java.net.URI;
2422
import java.util.Collections;
@@ -139,16 +137,22 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
139137
return new AddSpecResponse<>(null);
140138
}
141139

140+
/*
141+
validates if lease can be acquired on the provided flowSpec,
142+
else throw LeaseUnavailableException
143+
*/
142144
private void validateAdhocFlowLeasability(FlowSpec flowSpec) {
143145
if (!flowSpec.isScheduled()) {
144146
Config flowConfig = flowSpec.getConfig();
145147
String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
146148
String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
149+
147150
DagActionStore.DagAction dagAction = DagActionStore.DagAction.forFlow(flowGroup, flowName,
148151
FlowUtils.getOrCreateFlowExecutionId(flowSpec), DagActionStore.DagActionType.LAUNCH);
149152
DagActionStore.LeaseParams leaseParams = new DagActionStore.LeaseParams(dagAction, System.currentTimeMillis());
153+
_log.info("validation of lease acquirability of adhoc flow with lease params: " + leaseParams);
150154
try {
151-
if (!dagManagementStateStore.canAcquireLeaseOnEntity(leaseParams)) {
155+
if (!dagManagementStateStore.isLeaseAcquirable(leaseParams)) {
152156
throw new LeaseUnavailableException("Lease already occupied by another execution of this flow");
153157
}
154158
} catch (IOException exception) {

gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -97,12 +97,12 @@ public static <T> boolean compareLists(List<T> list1, List<T> list2) {
9797

9898
@Test
9999
public void testcanAcquireLeaseOnEntity() throws Exception{
100-
Mockito.when(leaseArbiter.canAcquireLeaseOnEntity(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(true);
100+
Mockito.when(leaseArbiter.isLeaseAcquirable(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(true);
101101
String flowName = "testFlow";
102102
String flowGroup = "testGroup";
103103
DagActionStore.DagAction dagAction = new DagActionStore.DagAction(flowName, flowGroup, System.currentTimeMillis(), "testJob", DagActionStore.DagActionType.LAUNCH);
104104
DagActionStore.LeaseParams leaseParams = new DagActionStore.LeaseParams(dagAction);
105-
Assert.assertTrue(dagManagementStateStore.canAcquireLeaseOnEntity(leaseParams));
105+
Assert.assertTrue(dagManagementStateStore.isLeaseAcquirable(leaseParams));
106106
}
107107

108108
@Test

gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,7 @@ public void testWhenLeasableEntityUnavailable() throws Exception{
223223
Assert.assertTrue(firstLaunchStatus instanceof LeaseAttemptStatus.LeaseObtainedStatus);
224224
completeLeaseHelper(launchLeaseParams3);
225225
Thread.sleep(LESS_THAN_EPSILON);
226-
Assert.assertFalse(mysqlMultiActiveLeaseArbiter.canAcquireLeaseOnEntity(launchLeaseParams3));
226+
Assert.assertFalse(mysqlMultiActiveLeaseArbiter.isLeaseAcquirable(launchLeaseParams3));
227227
}
228228

229229
/*
@@ -236,7 +236,7 @@ public void testWhenLeasableEntityAvailable() throws Exception{
236236
Assert.assertTrue(firstLaunchStatus instanceof LeaseAttemptStatus.LeaseObtainedStatus);
237237
completeLeaseHelper(launchLeaseParams4);
238238
Thread.sleep(MORE_THAN_EPSILON);
239-
Assert.assertTrue(mysqlMultiActiveLeaseArbiter.canAcquireLeaseOnEntity(launchLeaseParams4));
239+
Assert.assertTrue(mysqlMultiActiveLeaseArbiter.isLeaseAcquirable(launchLeaseParams4));
240240
}
241241

242242
/*

gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java

+25-4
Original file line numberDiff line numberDiff line change
@@ -326,18 +326,40 @@ public void createFlowSpec() throws Throwable {
326326
and throw LeaseUnavailableException
327327
*/
328328
@Test(expectedExceptions = LeaseUnavailableException.class)
329-
public void testOnAddSpec_withFlowSpec_leaseUnavailable() throws IOException {
329+
public void onAddSpecForAdhocFlowThrowLeaseUnavailable() throws IOException {
330330
ConfigBuilder configBuilder = ConfigBuilder.create()
331331
.addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "testGroup")
332332
.addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "testName");
333333
Config config = configBuilder.build();
334334
FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build();
335-
Mockito.when(dagManagementStateStore.canAcquireLeaseOnEntity(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(false);
335+
Mockito.when(dagManagementStateStore.isLeaseAcquirable(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(false);
336336
dagMgrNotFlowLaunchHandlerBasedOrchestrator.onAddSpec(flowSpec);
337337
}
338338

339+
/*
340+
If no other flow has acquired lease within the epsilon time, then flow
341+
compilation and addition to the store occurs normally
342+
*/
343+
@Test
344+
public void onAddSpecForAdhocFlowLeaseAvailable() throws IOException {
345+
ConfigBuilder configBuilder = ConfigBuilder.create()
346+
.addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "testGroup")
347+
.addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "testName")
348+
.addPrimitive("gobblin.flow.sourceIdentifier", "source")
349+
.addPrimitive("gobblin.flow.destinationIdentifier", "destination");
350+
Config config = configBuilder.build();
351+
FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build();
352+
Mockito.when(dagManagementStateStore.isLeaseAcquirable(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(true);
353+
AddSpecResponse addSpecResponse = dagMgrNotFlowLaunchHandlerBasedOrchestrator.onAddSpec(flowSpec);
354+
Assert.assertNotNull(addSpecResponse);
355+
}
356+
357+
/*
358+
For Scheduled flow lease acquirable check does not occur,
359+
and flow compilation occurs successfully
360+
*/
339361
@Test
340-
public void testOnAddSpec_withFlowSpec_Available() throws IOException {
362+
public void onAddSpecForScheduledFlow() throws IOException {
341363
ConfigBuilder configBuilder = ConfigBuilder.create()
342364
.addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "testGroup")
343365
.addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "testName")
@@ -346,7 +368,6 @@ public void testOnAddSpec_withFlowSpec_Available() throws IOException {
346368
.addPrimitive("gobblin.flow.destinationIdentifier", "destination");
347369
Config config = configBuilder.build();
348370
FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build();
349-
Mockito.when(dagManagementStateStore.canAcquireLeaseOnEntity(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(true);
350371
AddSpecResponse response = new AddSpecResponse<>(new Object());
351372
Mockito.when(specCompiler.onAddSpec(flowSpec)).thenReturn(response);
352373
AddSpecResponse addSpecResponse = dagMgrNotFlowLaunchHandlerBasedOrchestrator.onAddSpec(flowSpec);

0 commit comments

Comments
 (0)