-
Notifications
You must be signed in to change notification settings - Fork 749
Delete Launch Action Events After Processing #3837
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,6 +19,7 @@ | |
|
|
||
| import com.codahale.metrics.Meter; | ||
| import com.codahale.metrics.Timer; | ||
| import com.google.common.annotations.VisibleForTesting; | ||
| import com.google.common.base.Joiner; | ||
| import com.google.common.base.Optional; | ||
| import com.google.common.base.Preconditions; | ||
|
|
@@ -218,8 +219,10 @@ public String toString() { | |
| private final long failedDagRetentionTime; | ||
| private final DagManagerMetrics dagManagerMetrics; | ||
|
|
||
| @Getter | ||
| @Inject(optional=true) | ||
| protected Optional<DagActionStore> dagActionStore; | ||
| @VisibleForTesting | ||
| public Optional<DagActionStore> dagActionStore; | ||
|
|
||
| private volatile boolean isActive = false; | ||
|
|
||
|
|
@@ -312,17 +315,25 @@ public synchronized void addDag(Dag<JobExecutionPlan> dag, boolean persist, bool | |
| log.warn("Skipping add dag because this instance of DagManager is not active for dag: {}", dag); | ||
| return; | ||
| } | ||
|
|
||
| DagId dagId = DagManagerUtils.generateDagId(dag); | ||
| if (persist) { | ||
| //Persist the dag | ||
| // Persist the dag | ||
| this.dagStateStore.writeCheckpoint(dag); | ||
| // After persisting the dag, its status will be tracked by active dagManagers so the action should be deleted | ||
| // to avoid duplicate executions upon leadership change | ||
| if (this.dagActionStore.isPresent()) { | ||
| this.dagActionStore.get().deleteDagAction(new DagActionStore.DagAction(dagId.getFlowGroup(), | ||
| dagId.getFlowName(), dagId.getFlowExecutionId(), DagActionStore.FlowActionType.LAUNCH)); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. suggest adding a method to |
||
| } | ||
| } | ||
| int queueId = DagManagerUtils.getDagQueueId(dag, this.numThreads); | ||
| // Add the dag to the specific queue determined by flowExecutionId | ||
| // Flow cancellation request has to be forwarded to the same DagManagerThread where the | ||
| // flow create request was forwarded. This is because Azkaban Exec Id is stored in the DagNode of the | ||
| // specific DagManagerThread queue | ||
| if (!this.runQueue[queueId].offer(dag)) { | ||
| throw new IOException("Could not add dag" + DagManagerUtils.generateDagId(dag) + "to queue"); | ||
| throw new IOException("Could not add dag" + dagId + "to queue"); | ||
| } | ||
| if (setStatus) { | ||
| submitEventsAndSetStatus(dag); | ||
|
|
@@ -511,8 +522,6 @@ public void handleLaunchFlowEvent(DagActionStore.DagAction launchAction) { | |
| log.warn("Failed flow compilation of spec causing launch flow event to be skipped on startup. Flow {}", flowId); | ||
| this.dagManagerMetrics.incrementFailedLaunchCount(); | ||
| } | ||
| // Upon handling the action, delete it so on leadership change this is not duplicated | ||
| this.dagActionStore.get().deleteDagAction(launchAction); | ||
| } catch (URISyntaxException e) { | ||
| log.warn(String.format("Could not create URI object for flowId %s due to exception", flowId), e); | ||
| this.dagManagerMetrics.incrementFailedLaunchCount(); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -47,6 +47,14 @@ static Map<String, String> getFlowMetadata(Config flowConfig) { | |
| return metadata; | ||
| } | ||
|
|
||
| /** | ||
| * Retrieves a flowExecutionId from flowMetadata map. Throws NPE if flowExecutionId is missing to prevent proceeding | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| * with a flow execution that does not have a flowExecutionId. | ||
| */ | ||
| public static String getFlowExecutionIdFromFlowMetadata(Map<String, String> flowMetadata) { | ||
| return flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD); | ||
| } | ||
|
|
||
| static Map<String, String> getJobMetadata(Map<String, String> flowMetadata, JobExecutionPlan jobExecutionPlan) { | ||
| Map<String, String> jobMetadata = Maps.newHashMap(); | ||
| JobSpec jobSpec = jobExecutionPlan.getJobSpec(); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,7 +18,6 @@ | |
| package org.apache.gobblin.service.monitoring; | ||
|
|
||
| import com.google.common.annotations.VisibleForTesting; | ||
| import com.google.common.base.Optional; | ||
| import com.google.common.cache.CacheBuilder; | ||
| import com.google.common.cache.CacheLoader; | ||
| import com.google.common.cache.LoadingCache; | ||
|
|
@@ -78,7 +77,6 @@ public String load(String key) throws Exception { | |
| protected LoadingCache<String, String> | ||
| dagActionsSeenCache = CacheBuilder.newBuilder().expireAfterWrite(10, TimeUnit.MINUTES).build(cacheLoader); | ||
|
|
||
| protected DagActionStore dagActionStore; | ||
| @Getter | ||
| @VisibleForTesting | ||
| protected DagManager dagManager; | ||
|
|
@@ -90,13 +88,12 @@ public String load(String key) throws Exception { | |
|
|
||
| // Note that the topic is an empty string (rather than null to avoid NPE) because this monitor relies on the consumer | ||
| // client itself to determine all Kafka related information dynamically rather than through the config. | ||
| public DagActionStoreChangeMonitor(String topic, Config config, DagActionStore dagActionStore, DagManager dagManager, | ||
| int numThreads, FlowCatalog flowCatalog, Orchestrator orchestrator, boolean isMultiActiveSchedulerEnabled) { | ||
| public DagActionStoreChangeMonitor(String topic, Config config, DagManager dagManager, int numThreads, | ||
| FlowCatalog flowCatalog, Orchestrator orchestrator, boolean isMultiActiveSchedulerEnabled) { | ||
| // Differentiate group id for each host | ||
| super(topic, config.withValue(GROUP_ID_KEY, | ||
| ConfigValueFactory.fromAnyRef(DAG_ACTION_CHANGE_MONITOR_PREFIX + UUID.randomUUID().toString())), | ||
| numThreads); | ||
| this.dagActionStore = dagActionStore; | ||
| this.dagManager = dagManager; | ||
| this.flowCatalog = flowCatalog; | ||
| this.orchestrator = orchestrator; | ||
|
|
@@ -168,7 +165,7 @@ protected void processMessage(DecodeableKafkaRecord message) { | |
| throw new RuntimeException(String.format("Received LAUNCH dagAction while not in multi-active scheduler " | ||
| + "mode for flowAction: %s", dagAction)); | ||
| } | ||
| submitFlowToDagManagerHelper(flowGroup, flowName, flowExecutionId); | ||
| submitFlowToDagManagerHelper(dagAction); | ||
| } else { | ||
| log.warn("Received unsupported dagAction {}. Expected to be a KILL, RESUME, or LAUNCH", dagActionType); | ||
| this.unexpectedErrors.mark(); | ||
|
|
@@ -195,15 +192,15 @@ protected void processMessage(DecodeableKafkaRecord message) { | |
| dagActionsSeenCache.put(changeIdentifier, changeIdentifier); | ||
| } | ||
|
|
||
| protected void submitFlowToDagManagerHelper(String flowGroup, String flowName, String flowExecutionId) { | ||
| protected void submitFlowToDagManagerHelper(DagActionStore.DagAction dagAction) { | ||
|
Comment on lines
-198
to
+195
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. excellent--much clearer method signature! |
||
| // Retrieve job execution plan by recompiling the flow spec to send to the DagManager | ||
| FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName); | ||
| FlowId flowId = new FlowId().setFlowGroup(dagAction.getFlowGroup()).setFlowName(dagAction.getFlowName()); | ||
| FlowSpec spec = null; | ||
| try { | ||
| URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId); | ||
| spec = (FlowSpec) flowCatalog.getSpecs(flowUri); | ||
| // Pass flowExecutionId to DagManager to be used for scheduled flows that do not already contain a flowExecutionId | ||
| this.orchestrator.submitFlowToDagManager(spec, Optional.of(flowExecutionId)); | ||
| this.orchestrator.submitFlowToDagManager(spec, dagAction); | ||
| } catch (URISyntaxException e) { | ||
| log.warn("Could not create URI object for flowId {}. Exception {}", flowId, e.getMessage()); | ||
| this.failedFlowLaunchSubmissions.mark(); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -130,7 +130,10 @@ void testAddDeleteSpec() throws Exception { | |
| .thenReturn(Collections.singletonList(flowExecutionId3)); | ||
|
|
||
| // mock add spec | ||
| // for very first dag to be added, add dag action to store and check its deleted by the addDag call | ||
| dagManager.getDagActionStore().get().addDagAction("group0", "flow0", Long.toString(flowExecutionId1), DagActionStore.FlowActionType.LAUNCH); | ||
| dagManager.addDag(dag1, true, true); | ||
| Assert.assertFalse(dagManager.getDagActionStore().get().exists("group0", "flow0", Long.toString(flowExecutionId1), DagActionStore.FlowActionType.LAUNCH)); | ||
| dagManager.addDag(dag2, true, true); | ||
| dagManager.addDag(dag3, true, true); | ||
|
|
||
|
|
@@ -346,6 +349,9 @@ class MockedDagManager extends DagManager { | |
| public MockedDagManager(Config config, boolean instrumentationEnabled) { | ||
| super(config, createJobStatusRetriever(), Mockito.mock(SharedFlowMetricsSingleton.class), | ||
| Mockito.mock(FlowStatusGenerator.class), Mockito.mock(FlowCatalog.class), instrumentationEnabled); | ||
| // this.dagActionStore = Optional.of(Mockito.mock(MysqlDagActionStore.class)); //Mockito.mock(DagActionStore.class)); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. remove? |
||
| // this.dagActionStore = mockMysqlDagActionStore(); | ||
| // this.dagStateStore = Mockito.mock(DagStateStore.class); | ||
| } | ||
|
|
||
| private static JobStatusRetriever createJobStatusRetriever() { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
couldn't this instead be package-protected?