Skip to content

Commit

Permalink
[GOBBLIN-2173] Avoid Adhoc flow spec addition for non leasable entity
Browse files Browse the repository at this point in the history
  • Loading branch information
vsinghal85 committed Nov 15, 2024
1 parent 13a6926 commit 618f357
Show file tree
Hide file tree
Showing 11 changed files with 162 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.FlowSpecSearchObject;
import org.apache.gobblin.runtime.api.LeaseUnavailableException;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
Expand Down Expand Up @@ -256,7 +257,10 @@ public CreateKVResponse<ComplexResourceKey<FlowId, FlowStatusId>, FlowConfig> cr
responseMap = this.flowCatalog.put(flowSpec, true);
} catch (QuotaExceededException e) {
throw new RestLiServiceException(HttpStatus.S_503_SERVICE_UNAVAILABLE, e.getMessage());
} catch (Throwable e) {
} catch(LeaseUnavailableException e){
throw new RestLiServiceException(HttpStatus.S_409_CONFLICT, e.getMessage());
}
catch (Throwable e) {
// TODO: Compilation errors should fall under throwable exceptions as well instead of checking for strings
log.warn(String.format("Failed to add flow configuration %s.%s to catalog due to", flowConfig.getId().getFlowGroup(), flowConfig.getId().getFlowName()), e);
throw new RestLiServiceException(HttpStatus.S_500_INTERNAL_SERVER_ERROR, e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.apache.gobblin.runtime.api;

public class LeaseUnavailableException extends RuntimeException {
public LeaseUnavailableException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ public interface DagManagementStateStore {
*/
void updateDagNode(Dag.DagNode<JobExecutionPlan> dagNode) throws IOException;

/**
* Returns true if lease can be acquired on entity, else returns false
*/
boolean existsLeasableEntity(DagActionStore.LeaseParams leaseParams) throws IOException;

/**
* Returns the requested {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} and its {@link JobStatus}.
* Both params are returned as optional and are empty if not present in the store.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ public LeaseAttemptStatus tryAcquireLease(DagActionStore.LeaseParams leaseParams
throw new RuntimeException(String.format("Unexpected LeaseAttemptStatus (%s) for %s", leaseAttemptStatus.getClass().getName(), leaseParams));
}

@Override
public boolean existsLeasableEntity(DagActionStore.LeaseParams leaseParams) throws IOException {
return decoratedMultiActiveLeaseArbiter.existsLeasableEntity(leaseParams);
}

@Override
public boolean recordLeaseSuccess(LeaseAttemptStatus.LeaseObtainedStatus status)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ public interface MultiActiveLeaseArbiter {
LeaseAttemptStatus tryAcquireLease(DagActionStore.LeaseParams leaseParams, boolean adoptConsensusFlowExecutionId)
throws IOException;

boolean existsLeasableEntity(DagActionStore.LeaseParams leaseParams)
throws IOException;

/**
* This method is used to indicate the owner of the lease has successfully completed required actions while holding
* the lease of the dag action event. It marks the lease as "no longer leasing", if the eventTimeMillis and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public class MySqlDagManagementStateStore implements DagManagementStateStore {
// todo - these two stores should merge
private DagStateStoreWithDagNodes dagStateStore;
private DagStateStoreWithDagNodes failedDagStateStore;
private MultiActiveLeaseArbiter multiActiveLeaseArbiter;
private final JobStatusRetriever jobStatusRetriever;
private boolean dagStoresInitialized = false;
private final UserQuotaManager quotaManager;
Expand All @@ -79,13 +80,14 @@ public class MySqlDagManagementStateStore implements DagManagementStateStore {

@Inject
public MySqlDagManagementStateStore(Config config, FlowCatalog flowCatalog, UserQuotaManager userQuotaManager,
JobStatusRetriever jobStatusRetriever, DagActionStore dagActionStore) {
JobStatusRetriever jobStatusRetriever, DagActionStore dagActionStore, MultiActiveLeaseArbiter multiActiveLeaseArbiter) {
this.quotaManager = userQuotaManager;
this.config = config;
this.flowCatalog = flowCatalog;
this.jobStatusRetriever = jobStatusRetriever;
this.dagManagerMetrics.activate();
this.dagActionStore = dagActionStore;
this.multiActiveLeaseArbiter = multiActiveLeaseArbiter;
}

// It should be called after topology spec map is set
Expand Down Expand Up @@ -168,6 +170,11 @@ public synchronized void updateDagNode(Dag.DagNode<JobExecutionPlan> dagNode)
this.dagStateStore.updateDagNode(dagNode);
}

@Override
public boolean existsLeasableEntity(DagActionStore.LeaseParams leaseParams) throws IOException {
return multiActiveLeaseArbiter.existsLeasableEntity(leaseParams);
}

@Override
public Optional<Dag<JobExecutionPlan>> getDag(Dag.DagId dagId) throws IOException {
return Optional.ofNullable(this.dagStateStore.getDag(dagId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,12 @@ else if (leaseValidityStatus == 2) {
}
}

@Override
public boolean existsLeasableEntity(DagActionStore.LeaseParams leaseParams) throws IOException {
Optional<GetEventInfoResult> infoResult = getExistingEventInfo(leaseParams);
return infoResult.isPresent() ? !infoResult.get().isWithinEpsilon() : true;
}

/**
* Checks leaseArbiterTable for an existing entry for this dag action and event time
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@

package org.apache.gobblin.service.modules.orchestration;

import com.linkedin.restli.common.HttpStatus;
import com.linkedin.restli.server.RestLiServiceException;
import java.io.IOException;
import java.net.URI;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

import org.apache.gobblin.runtime.api.LeaseUnavailableException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -78,6 +81,7 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
protected final SpecCompiler specCompiler;
protected final TopologyCatalog topologyCatalog;
private final JobStatusRetriever jobStatusRetriever;
private final DagManagementStateStore dagManagementStateStore;

protected final MetricContext metricContext;

Expand All @@ -100,6 +104,7 @@ public Orchestrator(Config config, TopologyCatalog topologyCatalog, Optional<Log
this.topologyCatalog = topologyCatalog;
this.flowLaunchHandler = flowLaunchHandler;
this.sharedFlowMetricsSingleton = sharedFlowMetricsSingleton;
this.dagManagementStateStore = dagManagementStateStore;
this.jobStatusRetriever = jobStatusRetriever;
this.specCompiler = flowCompilationValidationHelper.getSpecCompiler();
// todo remove the need to set topology factory outside of constructor GOBBLIN-2056
Expand All @@ -125,6 +130,7 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
_log.info("Orchestrator - onAdd[Topology]Spec: " + addedSpec);
this.specCompiler.onAddSpec(addedSpec);
} else if (addedSpec instanceof FlowSpec) {
validateAdhocFlowLeasability((FlowSpec) addedSpec);
_log.info("Orchestrator - onAdd[Flow]Spec: " + addedSpec);
return this.specCompiler.onAddSpec(addedSpec);
} else {
Expand All @@ -133,6 +139,25 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
return new AddSpecResponse<>(null);
}

private void validateAdhocFlowLeasability(FlowSpec flowSpec) {
if (!flowSpec.isScheduled()) {
Config flowConfig = flowSpec.getConfig();
String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
DagActionStore.DagAction dagAction = DagActionStore.DagAction.forFlow(flowGroup, flowName,
FlowUtils.getOrCreateFlowExecutionId(flowSpec), DagActionStore.DagActionType.LAUNCH);
DagActionStore.LeaseParams leaseParams = new DagActionStore.LeaseParams(dagAction, System.currentTimeMillis());
try {
if (!dagManagementStateStore.existsLeasableEntity(leaseParams)) {
throw new LeaseUnavailableException("Lease already occupied by another execution of this flow");
}
} catch (IOException exception) {
_log.error(String.format("Failed to query leaseArbiterTable for existing flow details: %s", flowSpec), exception);
throw new RuntimeException("Error querying leaseArbiterTable", exception);
}
}
}

public void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion) {
onDeleteSpec(deletedSpecURI, deletedSpecVersion, new Properties());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Map;
import java.util.Set;

import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
Expand Down Expand Up @@ -59,6 +60,7 @@
public class MySqlDagManagementStateStoreTest {

private ITestMetastoreDatabase testDb;
private static MultiActiveLeaseArbiter leaseArbiter;
private MySqlDagManagementStateStore dagManagementStateStore;
private static final String TEST_USER = "testUser";
public static final String TEST_PASSWORD = "testPassword";
Expand All @@ -68,6 +70,7 @@ public class MySqlDagManagementStateStoreTest {
@BeforeClass
public void setUp() throws Exception {
// Setting up mock DB
this.leaseArbiter = mock(MultiActiveLeaseArbiter.class);
this.testDb = TestMetastoreDatabaseFactory.get();
this.dagManagementStateStore = getDummyDMSS(this.testDb);
}
Expand All @@ -92,6 +95,16 @@ public static <T> boolean compareLists(List<T> list1, List<T> list2) {
return true;
}

@Test
public void testExistsLeasableEntity() throws Exception{
Mockito.when(leaseArbiter.existsLeasableEntity(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(true);
String flowName = "testFlow";
String flowGroup = "testGroup";
DagActionStore.DagAction dagAction = new DagActionStore.DagAction(flowName, flowGroup, System.currentTimeMillis(), "testJob", DagActionStore.DagActionType.LAUNCH);
DagActionStore.LeaseParams leaseParams = new DagActionStore.LeaseParams(dagAction);
Assert.assertTrue(dagManagementStateStore.existsLeasableEntity(leaseParams));
}

@Test
public void testAddDag() throws Exception {
Dag<JobExecutionPlan> dag = DagTestUtils.buildDag("test", 12345L);
Expand Down Expand Up @@ -150,9 +163,11 @@ public static MySqlDagManagementStateStore getDummyDMSS(ITestMetastoreDatabase t
TopologySpec topologySpec = LaunchDagProcTest.buildNaiveTopologySpec(TEST_SPEC_EXECUTOR_URI);
URI specExecURI = new URI(TEST_SPEC_EXECUTOR_URI);
topologySpecMap.put(specExecURI, topologySpec);
MultiActiveLeaseArbiter multiActiveLeaseArbiter = Mockito.mock(MultiActiveLeaseArbiter.class);
leaseArbiter = multiActiveLeaseArbiter;
MySqlDagManagementStateStore dagManagementStateStore =
new MySqlDagManagementStateStore(config, null, null, jobStatusRetriever,
MysqlDagActionStoreTest.getTestDagActionStore(testMetastoreDatabase));
MysqlDagActionStoreTest.getTestDagActionStore(testMetastoreDatabase), multiActiveLeaseArbiter);
dagManagementStateStore.setTopologySpecMap(topologySpecMap);
return dagManagementStateStore;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
public class MysqlMultiActiveLeaseArbiterTest {
private static final long EPSILON = 10000L;
private static final long MORE_THAN_EPSILON = (long) (EPSILON * 1.1);
private static final long LESS_THAN_EPSILON = (long) (EPSILON * 0.90);
// NOTE: `sleep`ing this long SIGNIFICANTLY slows tests, but we need a large enough value that exec. variability won't cause spurious failure
private static final long LINGER = 20000L;
private static final long MORE_THAN_LINGER = (long) (LINGER * 1.1);
Expand All @@ -53,6 +54,8 @@ public class MysqlMultiActiveLeaseArbiterTest {
private static final String CONSTANTS_TABLE = "constants_store";
private static final String flowGroup = "testFlowGroup";
private static final String flowGroup2 = "testFlowGroup2";
private static final String flowGroup3 = "testFlowGroup3";
private static final String flowGroup4 = "testFlowGroup4";
private static final String flowName = "testFlowName";
private static final String jobName = "testJobName";
private static final long flowExecutionId = 12345677L;
Expand All @@ -70,6 +73,14 @@ public class MysqlMultiActiveLeaseArbiterTest {
new DagActionStore.DagAction(flowGroup2, flowName, flowExecutionId, jobName, DagActionStore.DagActionType.LAUNCH);
private static final DagActionStore.LeaseParams
launchLeaseParams2 = new DagActionStore.LeaseParams(launchDagAction2, false, eventTimeMillis);
private static final DagActionStore.DagAction launchDagAction3 =
new DagActionStore.DagAction(flowGroup3, flowName, flowExecutionId, jobName, DagActionStore.DagActionType.LAUNCH);
private static final DagActionStore.LeaseParams
launchLeaseParams3 = new DagActionStore.LeaseParams(launchDagAction3, false, eventTimeMillis);
private static final DagActionStore.DagAction launchDagAction4 =
new DagActionStore.DagAction(flowGroup4, flowName, flowExecutionId, jobName, DagActionStore.DagActionType.LAUNCH);
private static final DagActionStore.LeaseParams
launchLeaseParams4 = new DagActionStore.LeaseParams(launchDagAction4, false, eventTimeMillis);
private static final Timestamp dummyTimestamp = new Timestamp(99999);
private ITestMetastoreDatabase testDb;
private MysqlMultiActiveLeaseArbiter mysqlMultiActiveLeaseArbiter;
Expand Down Expand Up @@ -201,6 +212,33 @@ public void testAcquireLeaseSingleParticipant() throws Exception {
<= sixthObtainedStatus.getLeaseAcquisitionTimestamp());
}

/*
test to verify if leasable entity is unavailable before epsilon time
to account for clock drift
*/
@Test
public void testWhenLeasableEntityUnavailable() throws Exception{
LeaseAttemptStatus firstLaunchStatus =
mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseParams3, true);
Assert.assertTrue(firstLaunchStatus instanceof LeaseAttemptStatus.LeaseObtainedStatus);
completeLeaseHelper(launchLeaseParams3);
Thread.sleep(LESS_THAN_EPSILON);
Assert.assertFalse(mysqlMultiActiveLeaseArbiter.existsLeasableEntity(launchLeaseParams3));
}

/*
test to verify if leasable entity exists post epsilon time
*/
@Test
public void testWhenLeasableEntityAvailable() throws Exception{
LeaseAttemptStatus firstLaunchStatus =
mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseParams4, true);
Assert.assertTrue(firstLaunchStatus instanceof LeaseAttemptStatus.LeaseObtainedStatus);
completeLeaseHelper(launchLeaseParams4);
Thread.sleep(MORE_THAN_EPSILON);
Assert.assertTrue(mysqlMultiActiveLeaseArbiter.existsLeasableEntity(launchLeaseParams4));
}

/*
Tests attemptLeaseIfNewRow() method to ensure a new row is inserted if no row matches the primary key in the table.
If such a row does exist, the method should disregard the resulting SQL error and return 0 rows updated, indicating
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,19 @@
package org.apache.gobblin.service.modules.orchestration;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collection;
import java.util.Properties;

import org.apache.commons.io.FileUtils;
import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.runtime.api.LeaseUnavailableException;
import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
import org.apache.gobblin.service.modules.flow.SpecCompiler;
import org.apache.hadoop.fs.Path;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
Expand Down Expand Up @@ -86,6 +92,8 @@ public class OrchestratorTest {
private FlowSpec flowSpec;
private ITestMetastoreDatabase testMetastoreDatabase;
private Orchestrator dagMgrNotFlowLaunchHandlerBasedOrchestrator;
private DagManagementStateStore dagManagementStateStore;
private SpecCompiler specCompiler;

@BeforeClass
public void setUpClass() throws Exception {
Expand All @@ -107,7 +115,7 @@ public void setUp() throws Exception {
flowProperties.put("specStore.fs.dir", FLOW_SPEC_STORE_DIR);

this.serviceLauncher = new ServiceBasedAppLauncher(orchestratorProperties, "OrchestratorCatalogTest");

this.specCompiler = Mockito.mock(SpecCompiler.class);
this.topologyCatalog = new TopologyCatalog(ConfigUtils.propertiesToConfig(topologyProperties),
Optional.of(logger));
this.serviceLauncher.addService(topologyCatalog);
Expand All @@ -116,9 +124,10 @@ public void setUp() throws Exception {
this.flowCatalog = new FlowCatalog(ConfigUtils.propertiesToConfig(flowProperties), Optional.of(logger), Optional.absent(), true);

this.serviceLauncher.addService(flowCatalog);

MultiActiveLeaseArbiter leaseArbiter = Mockito.mock(MultiActiveLeaseArbiter.class);
MySqlDagManagementStateStore dagManagementStateStore =
spy(MySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
this.dagManagementStateStore=dagManagementStateStore;

SharedFlowMetricsSingleton sharedFlowMetricsSingleton = new SharedFlowMetricsSingleton(ConfigUtils.propertiesToConfig(orchestratorProperties));

Expand Down Expand Up @@ -311,6 +320,39 @@ public void createFlowSpec() throws Throwable {
"SpecProducer should contain 0 Spec after addition");
}

/*
If another flow has already acquired lease for this flowspec details within
epsilon time, then we do not execute this flow, hence do not process and store the spec
and throw LeaseUnavailableException
*/
@Test(expectedExceptions = LeaseUnavailableException.class)
public void testOnAddSpec_withFlowSpec_leaseUnavailable() throws IOException {
ConfigBuilder configBuilder = ConfigBuilder.create()
.addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "testGroup")
.addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "testName");
Config config = configBuilder.build();
FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build();
Mockito.when(dagManagementStateStore.existsLeasableEntity(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(false);
dagMgrNotFlowLaunchHandlerBasedOrchestrator.onAddSpec(flowSpec);
}

@Test
public void testOnAddSpec_withFlowSpec_Available() throws IOException {
ConfigBuilder configBuilder = ConfigBuilder.create()
.addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "testGroup")
.addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "testName")
.addPrimitive(ConfigurationKeys.JOB_SCHEDULE_KEY, "0 1/0 * ? * *")
.addPrimitive("gobblin.flow.sourceIdentifier", "source")
.addPrimitive("gobblin.flow.destinationIdentifier", "destination");
Config config = configBuilder.build();
FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build();
Mockito.when(dagManagementStateStore.existsLeasableEntity(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(true);
AddSpecResponse response = new AddSpecResponse<>(new Object());
Mockito.when(specCompiler.onAddSpec(flowSpec)).thenReturn(response);
AddSpecResponse addSpecResponse = dagMgrNotFlowLaunchHandlerBasedOrchestrator.onAddSpec(flowSpec);
Assert.assertNotNull(addSpecResponse);
}

@Test
public void deleteFlowSpec() throws Throwable {
// TODO: fix this lingering inter-test dep from when `@BeforeClass` init, which we've since replaced by `Mockito.verify`-friendly `@BeforeMethod`
Expand Down

0 comments on commit 618f357

Please sign in to comment.