diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java index 4025cf0dbc1..dd17e2d21e3 100644 --- a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java +++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java @@ -25,7 +25,6 @@ import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.kafka.client.DecodeableKafkaRecord; import org.apache.gobblin.kafka.client.Kafka09ConsumerClient; -import org.apache.gobblin.runtime.api.DagActionStore; import org.apache.gobblin.runtime.api.SpecNotFoundException; import org.apache.gobblin.runtime.spec_catalog.FlowCatalog; import org.apache.gobblin.service.modules.orchestration.DagManager; @@ -66,8 +65,8 @@ class MockDagActionStoreChangeMonitor extends DagActionStoreChangeMonitor { public MockDagActionStoreChangeMonitor(String topic, Config config, int numThreads, boolean isMultiActiveSchedulerEnabled) { - super(topic, config, mock(DagActionStore.class), mock(DagManager.class), numThreads, mock(FlowCatalog.class), - mock(Orchestrator.class), isMultiActiveSchedulerEnabled); + super(topic, config, mock(DagManager.class), numThreads, mock(FlowCatalog.class), mock(Orchestrator.class), + isMultiActiveSchedulerEnabled); } protected void processMessageForTest(DecodeableKafkaRecord record) { diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java index c90841b50a1..ec837658469 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java @@ -19,6 +19,7 @@ import com.codahale.metrics.Meter; import com.codahale.metrics.Timer; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.common.base.Optional; import com.google.common.base.Preconditions; @@ -185,6 +186,10 @@ public DagId(String flowGroup, String flowName, String flowExecutionId) { public String toString() { return Joiner.on("_").join(flowGroup, flowName, flowExecutionId); } + + DagActionStore.DagAction toDagAction(DagActionStore.FlowActionType actionType) { + return new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, actionType); + } } private final BlockingQueue>[] runQueue; @@ -218,7 +223,9 @@ public String toString() { private final long failedDagRetentionTime; private final DagManagerMetrics dagManagerMetrics; + @Getter @Inject(optional=true) + @VisibleForTesting protected Optional dagActionStore; private volatile boolean isActive = false; @@ -312,9 +319,16 @@ public synchronized void addDag(Dag dag, boolean persist, bool log.warn("Skipping add dag because this instance of DagManager is not active for dag: {}", dag); return; } + + DagId dagId = DagManagerUtils.generateDagId(dag); if (persist) { - //Persist the dag + // Persist the dag this.dagStateStore.writeCheckpoint(dag); + // After persisting the dag, its status will be tracked by active dagManagers so the action should be deleted + // to avoid duplicate executions upon leadership change + if (this.dagActionStore.isPresent()) { + this.dagActionStore.get().deleteDagAction(dagId.toDagAction(DagActionStore.FlowActionType.LAUNCH)); + } } int queueId = DagManagerUtils.getDagQueueId(dag, this.numThreads); // Add the dag to the specific queue determined by flowExecutionId @@ -322,7 +336,7 @@ public synchronized void addDag(Dag dag, boolean persist, bool // flow create request was forwarded. This is because Azkaban Exec Id is stored in the DagNode of the // specific DagManagerThread queue if (!this.runQueue[queueId].offer(dag)) { - throw new IOException("Could not add dag" + DagManagerUtils.generateDagId(dag) + "to queue"); + throw new IOException("Could not add dag" + dagId + "to queue"); } if (setStatus) { submitEventsAndSetStatus(dag); @@ -511,8 +525,6 @@ public void handleLaunchFlowEvent(DagActionStore.DagAction launchAction) { log.warn("Failed flow compilation of spec causing launch flow event to be skipped on startup. Flow {}", flowId); this.dagManagerMetrics.incrementFailedLaunchCount(); } - // Upon handling the action, delete it so on leadership change this is not duplicated - this.dagActionStore.get().deleteDagAction(launchAction); } catch (URISyntaxException e) { log.warn(String.format("Could not create URI object for flowId %s due to exception", flowId), e); this.dagManagerMetrics.incrementFailedLaunchCount(); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java index 0461bb11f30..023f8711745 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java @@ -250,6 +250,16 @@ public void orchestrate(Spec spec, Properties jobProps, long triggerTimestampMil } Map flowMetadata = TimingEventUtils.getFlowMetadata((FlowSpec) spec); FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDagOptional.get()); + java.util.Optional flowExecutionId = TimingEventUtils.getFlowExecutionIdFromFlowMetadata(flowMetadata); + + // Unexpected result because flowExecutionId should be provided by above call too 'addFlowExecutionIdIfAbsent' + if (!flowExecutionId.isPresent()) { + _log.warn("FlowMetadata does not contain flowExecutionId when it should have been provided. Skipping execution " + + "of: {}", spec); + return; + } + DagActionStore.DagAction flowAction = + new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId.get(), DagActionStore.FlowActionType.LAUNCH); // If multi-active scheduler is enabled do not pass onto DagManager, otherwise scheduler forwards it directly // Skip flow compilation as well, since we recompile after receiving event from DagActionStoreChangeMonitor later @@ -266,9 +276,6 @@ public void orchestrate(Spec spec, Properties jobProps, long triggerTimestampMil return; } - String flowExecutionId = flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD); - DagActionStore.DagAction flowAction = - new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, DagActionStore.FlowActionType.LAUNCH); flowTriggerHandler.get().handleTriggerEvent(jobProps, flowAction, triggerTimestampMillis, isReminderEvent); _log.info("Multi-active scheduler finished handling trigger event: [{}, is: {}, triggerEventTimestamp: {}]", flowAction, isReminderEvent ? "reminder" : "original", triggerTimestampMillis); @@ -306,7 +313,7 @@ public void orchestrate(Spec spec, Properties jobProps, long triggerTimestampMil Spec jobSpec = jobExecutionPlan.getJobSpec(); if (!((JobSpec) jobSpec).getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) { - _log.warn("JobSpec does not contain flowExecutionId."); + _log.warn("JobSpec does not contain flowExecutionId: {}", jobSpec); } Map jobMetadata = TimingEventUtils.getJobMetadata(flowMetadata, jobExecutionPlan); @@ -335,9 +342,10 @@ public void orchestrate(Spec spec, Properties jobProps, long triggerTimestampMil Instrumented.updateTimer(this.flowOrchestrationTimer, System.nanoTime() - startTime, TimeUnit.NANOSECONDS); } - public void submitFlowToDagManager(FlowSpec flowSpec, Optional optionalFlowExecutionId) throws IOException, InterruptedException { + public void submitFlowToDagManager(FlowSpec flowSpec, DagActionStore.DagAction flowAction) throws IOException, InterruptedException { Optional> optionalJobExecutionPlanDag = - this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec, optionalFlowExecutionId); + this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec, + Optional.of(flowAction.getFlowExecutionId())); if (optionalJobExecutionPlanDag.isPresent()) { submitFlowToDagManager(flowSpec, optionalJobExecutionPlanDag.get()); } else { @@ -349,7 +357,7 @@ public void submitFlowToDagManager(FlowSpec flowSpec, Optional optionalF public void submitFlowToDagManager(FlowSpec flowSpec, Dag jobExecutionPlanDag) throws IOException { try { - //Send the dag to the DagManager. + // Send the dag to the DagManager this.dagManager.get().addDag(jobExecutionPlanDag, true, true); } catch (Exception ex) { String failureMessage = "Failed to add Job Execution Plan due to: " + ex.getMessage(); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java index 99661305fd3..7947ed0aead 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java @@ -21,6 +21,7 @@ import com.google.common.collect.Maps; import com.typesafe.config.Config; +import java.util.Optional; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.metrics.event.TimingEvent; import org.apache.gobblin.runtime.api.FlowSpec; @@ -47,6 +48,13 @@ static Map getFlowMetadata(Config flowConfig) { return metadata; } + /** + * Retrieves a flowExecutionId from flowMetadata map if one exists otherwise an empty Optional. + */ + public static Optional getFlowExecutionIdFromFlowMetadata(Map flowMetadata) { + return Optional.ofNullable(flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD)); + } + static Map getJobMetadata(Map flowMetadata, JobExecutionPlan jobExecutionPlan) { Map jobMetadata = Maps.newHashMap(); JobSpec jobSpec = jobExecutionPlan.getJobSpec(); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java index e9497c52650..573faff0150 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java @@ -18,7 +18,6 @@ package org.apache.gobblin.service.monitoring; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Optional; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; @@ -78,7 +77,6 @@ public String load(String key) throws Exception { protected LoadingCache dagActionsSeenCache = CacheBuilder.newBuilder().expireAfterWrite(10, TimeUnit.MINUTES).build(cacheLoader); - protected DagActionStore dagActionStore; @Getter @VisibleForTesting protected DagManager dagManager; @@ -90,13 +88,12 @@ public String load(String key) throws Exception { // Note that the topic is an empty string (rather than null to avoid NPE) because this monitor relies on the consumer // client itself to determine all Kafka related information dynamically rather than through the config. - public DagActionStoreChangeMonitor(String topic, Config config, DagActionStore dagActionStore, DagManager dagManager, - int numThreads, FlowCatalog flowCatalog, Orchestrator orchestrator, boolean isMultiActiveSchedulerEnabled) { + public DagActionStoreChangeMonitor(String topic, Config config, DagManager dagManager, int numThreads, + FlowCatalog flowCatalog, Orchestrator orchestrator, boolean isMultiActiveSchedulerEnabled) { // Differentiate group id for each host super(topic, config.withValue(GROUP_ID_KEY, ConfigValueFactory.fromAnyRef(DAG_ACTION_CHANGE_MONITOR_PREFIX + UUID.randomUUID().toString())), numThreads); - this.dagActionStore = dagActionStore; this.dagManager = dagManager; this.flowCatalog = flowCatalog; this.orchestrator = orchestrator; @@ -168,7 +165,7 @@ protected void processMessage(DecodeableKafkaRecord message) { throw new RuntimeException(String.format("Received LAUNCH dagAction while not in multi-active scheduler " + "mode for flowAction: %s", dagAction)); } - submitFlowToDagManagerHelper(flowGroup, flowName, flowExecutionId); + submitFlowToDagManagerHelper(dagAction); } else { log.warn("Received unsupported dagAction {}. Expected to be a KILL, RESUME, or LAUNCH", dagActionType); this.unexpectedErrors.mark(); @@ -195,15 +192,15 @@ protected void processMessage(DecodeableKafkaRecord message) { dagActionsSeenCache.put(changeIdentifier, changeIdentifier); } - protected void submitFlowToDagManagerHelper(String flowGroup, String flowName, String flowExecutionId) { + protected void submitFlowToDagManagerHelper(DagActionStore.DagAction dagAction) { // Retrieve job execution plan by recompiling the flow spec to send to the DagManager - FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName); + FlowId flowId = new FlowId().setFlowGroup(dagAction.getFlowGroup()).setFlowName(dagAction.getFlowName()); FlowSpec spec = null; try { URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId); spec = (FlowSpec) flowCatalog.getSpecs(flowUri); // Pass flowExecutionId to DagManager to be used for scheduled flows that do not already contain a flowExecutionId - this.orchestrator.submitFlowToDagManager(spec, Optional.of(flowExecutionId)); + this.orchestrator.submitFlowToDagManager(spec, dagAction); } catch (URISyntaxException e) { log.warn("Could not create URI object for flowId {}. Exception {}", flowId, e.getMessage()); this.failedFlowLaunchSubmissions.mark(); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java index 5806949a8b1..1dba94baec1 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java @@ -42,7 +42,6 @@ public class DagActionStoreChangeMonitorFactory implements Provider