Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
import org.apache.gobblin.kafka.client.Kafka09ConsumerClient;
import org.apache.gobblin.runtime.api.DagActionStore;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.service.modules.orchestration.DagManager;
Expand Down Expand Up @@ -66,8 +65,8 @@ class MockDagActionStoreChangeMonitor extends DagActionStoreChangeMonitor {

public MockDagActionStoreChangeMonitor(String topic, Config config, int numThreads,
boolean isMultiActiveSchedulerEnabled) {
super(topic, config, mock(DagActionStore.class), mock(DagManager.class), numThreads, mock(FlowCatalog.class),
mock(Orchestrator.class), isMultiActiveSchedulerEnabled);
super(topic, config, mock(DagManager.class), numThreads, mock(FlowCatalog.class), mock(Orchestrator.class),
isMultiActiveSchedulerEnabled);
}

protected void processMessageForTest(DecodeableKafkaRecord record) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -185,6 +186,10 @@ public DagId(String flowGroup, String flowName, String flowExecutionId) {
public String toString() {
return Joiner.on("_").join(flowGroup, flowName, flowExecutionId);
}

DagActionStore.DagAction toDagAction(DagActionStore.FlowActionType actionType) {
return new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, actionType);
}
}

private final BlockingQueue<Dag<JobExecutionPlan>>[] runQueue;
Expand Down Expand Up @@ -218,7 +223,9 @@ public String toString() {
private final long failedDagRetentionTime;
private final DagManagerMetrics dagManagerMetrics;

@Getter
@Inject(optional=true)
@VisibleForTesting
protected Optional<DagActionStore> dagActionStore;

private volatile boolean isActive = false;
Expand Down Expand Up @@ -312,17 +319,24 @@ public synchronized void addDag(Dag<JobExecutionPlan> dag, boolean persist, bool
log.warn("Skipping add dag because this instance of DagManager is not active for dag: {}", dag);
return;
}

DagId dagId = DagManagerUtils.generateDagId(dag);
if (persist) {
//Persist the dag
// Persist the dag
this.dagStateStore.writeCheckpoint(dag);
// After persisting the dag, its status will be tracked by active dagManagers so the action should be deleted
// to avoid duplicate executions upon leadership change
if (this.dagActionStore.isPresent()) {
this.dagActionStore.get().deleteDagAction(dagId.toDagAction(DagActionStore.FlowActionType.LAUNCH));
}
}
int queueId = DagManagerUtils.getDagQueueId(dag, this.numThreads);
// Add the dag to the specific queue determined by flowExecutionId
// Flow cancellation request has to be forwarded to the same DagManagerThread where the
// flow create request was forwarded. This is because Azkaban Exec Id is stored in the DagNode of the
// specific DagManagerThread queue
if (!this.runQueue[queueId].offer(dag)) {
throw new IOException("Could not add dag" + DagManagerUtils.generateDagId(dag) + "to queue");
throw new IOException("Could not add dag" + dagId + "to queue");
}
if (setStatus) {
submitEventsAndSetStatus(dag);
Expand Down Expand Up @@ -511,8 +525,6 @@ public void handleLaunchFlowEvent(DagActionStore.DagAction launchAction) {
log.warn("Failed flow compilation of spec causing launch flow event to be skipped on startup. Flow {}", flowId);
this.dagManagerMetrics.incrementFailedLaunchCount();
}
// Upon handling the action, delete it so on leadership change this is not duplicated
this.dagActionStore.get().deleteDagAction(launchAction);
} catch (URISyntaxException e) {
log.warn(String.format("Could not create URI object for flowId %s due to exception", flowId), e);
this.dagManagerMetrics.incrementFailedLaunchCount();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,16 @@ public void orchestrate(Spec spec, Properties jobProps, long triggerTimestampMil
}
Map<String, String> flowMetadata = TimingEventUtils.getFlowMetadata((FlowSpec) spec);
FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDagOptional.get());
java.util.Optional<String> flowExecutionId = TimingEventUtils.getFlowExecutionIdFromFlowMetadata(flowMetadata);

// Unexpected result because flowExecutionId should be provided by above call too 'addFlowExecutionIdIfAbsent'
if (!flowExecutionId.isPresent()) {
_log.warn("FlowMetadata does not contain flowExecutionId when it should have been provided. Skipping execution "
+ "of: {}", spec);
return;
}
DagActionStore.DagAction flowAction =
new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId.get(), DagActionStore.FlowActionType.LAUNCH);

// If multi-active scheduler is enabled do not pass onto DagManager, otherwise scheduler forwards it directly
// Skip flow compilation as well, since we recompile after receiving event from DagActionStoreChangeMonitor later
Expand All @@ -266,9 +276,6 @@ public void orchestrate(Spec spec, Properties jobProps, long triggerTimestampMil
return;
}

String flowExecutionId = flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD);
DagActionStore.DagAction flowAction =
new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, DagActionStore.FlowActionType.LAUNCH);
flowTriggerHandler.get().handleTriggerEvent(jobProps, flowAction, triggerTimestampMillis, isReminderEvent);
_log.info("Multi-active scheduler finished handling trigger event: [{}, is: {}, triggerEventTimestamp: {}]",
flowAction, isReminderEvent ? "reminder" : "original", triggerTimestampMillis);
Expand Down Expand Up @@ -306,7 +313,7 @@ public void orchestrate(Spec spec, Properties jobProps, long triggerTimestampMil
Spec jobSpec = jobExecutionPlan.getJobSpec();

if (!((JobSpec) jobSpec).getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) {
_log.warn("JobSpec does not contain flowExecutionId.");
_log.warn("JobSpec does not contain flowExecutionId: {}", jobSpec);
}

Map<String, String> jobMetadata = TimingEventUtils.getJobMetadata(flowMetadata, jobExecutionPlan);
Expand Down Expand Up @@ -335,9 +342,10 @@ public void orchestrate(Spec spec, Properties jobProps, long triggerTimestampMil
Instrumented.updateTimer(this.flowOrchestrationTimer, System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
}

public void submitFlowToDagManager(FlowSpec flowSpec, Optional<String> optionalFlowExecutionId) throws IOException, InterruptedException {
public void submitFlowToDagManager(FlowSpec flowSpec, DagActionStore.DagAction flowAction) throws IOException, InterruptedException {
Optional<Dag<JobExecutionPlan>> optionalJobExecutionPlanDag =
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec, optionalFlowExecutionId);
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec,
Optional.of(flowAction.getFlowExecutionId()));
if (optionalJobExecutionPlanDag.isPresent()) {
submitFlowToDagManager(flowSpec, optionalJobExecutionPlanDag.get());
} else {
Expand All @@ -349,7 +357,7 @@ public void submitFlowToDagManager(FlowSpec flowSpec, Optional<String> optionalF
public void submitFlowToDagManager(FlowSpec flowSpec, Dag<JobExecutionPlan> jobExecutionPlanDag)
throws IOException {
try {
//Send the dag to the DagManager.
// Send the dag to the DagManager
this.dagManager.get().addDag(jobExecutionPlanDag, true, true);
} catch (Exception ex) {
String failureMessage = "Failed to add Job Execution Plan due to: " + ex.getMessage();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.common.collect.Maps;
import com.typesafe.config.Config;

import java.util.Optional;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.api.FlowSpec;
Expand All @@ -47,6 +48,13 @@ static Map<String, String> getFlowMetadata(Config flowConfig) {
return metadata;
}

/**
* Retrieves a flowExecutionId from flowMetadata map if one exists otherwise an empty Optional.
*/
public static Optional<String> getFlowExecutionIdFromFlowMetadata(Map<String, String> flowMetadata) {
return Optional.ofNullable(flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD));
}

static Map<String, String> getJobMetadata(Map<String, String> flowMetadata, JobExecutionPlan jobExecutionPlan) {
Map<String, String> jobMetadata = Maps.newHashMap();
JobSpec jobSpec = jobExecutionPlan.getJobSpec();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.gobblin.service.monitoring;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
Expand Down Expand Up @@ -78,7 +77,6 @@ public String load(String key) throws Exception {
protected LoadingCache<String, String>
dagActionsSeenCache = CacheBuilder.newBuilder().expireAfterWrite(10, TimeUnit.MINUTES).build(cacheLoader);

protected DagActionStore dagActionStore;
@Getter
@VisibleForTesting
protected DagManager dagManager;
Expand All @@ -90,13 +88,12 @@ public String load(String key) throws Exception {

// Note that the topic is an empty string (rather than null to avoid NPE) because this monitor relies on the consumer
// client itself to determine all Kafka related information dynamically rather than through the config.
public DagActionStoreChangeMonitor(String topic, Config config, DagActionStore dagActionStore, DagManager dagManager,
int numThreads, FlowCatalog flowCatalog, Orchestrator orchestrator, boolean isMultiActiveSchedulerEnabled) {
public DagActionStoreChangeMonitor(String topic, Config config, DagManager dagManager, int numThreads,
FlowCatalog flowCatalog, Orchestrator orchestrator, boolean isMultiActiveSchedulerEnabled) {
// Differentiate group id for each host
super(topic, config.withValue(GROUP_ID_KEY,
ConfigValueFactory.fromAnyRef(DAG_ACTION_CHANGE_MONITOR_PREFIX + UUID.randomUUID().toString())),
numThreads);
this.dagActionStore = dagActionStore;
this.dagManager = dagManager;
this.flowCatalog = flowCatalog;
this.orchestrator = orchestrator;
Expand Down Expand Up @@ -168,7 +165,7 @@ protected void processMessage(DecodeableKafkaRecord message) {
throw new RuntimeException(String.format("Received LAUNCH dagAction while not in multi-active scheduler "
+ "mode for flowAction: %s", dagAction));
}
submitFlowToDagManagerHelper(flowGroup, flowName, flowExecutionId);
submitFlowToDagManagerHelper(dagAction);
} else {
log.warn("Received unsupported dagAction {}. Expected to be a KILL, RESUME, or LAUNCH", dagActionType);
this.unexpectedErrors.mark();
Expand All @@ -195,15 +192,15 @@ protected void processMessage(DecodeableKafkaRecord message) {
dagActionsSeenCache.put(changeIdentifier, changeIdentifier);
}

protected void submitFlowToDagManagerHelper(String flowGroup, String flowName, String flowExecutionId) {
protected void submitFlowToDagManagerHelper(DagActionStore.DagAction dagAction) {
// Retrieve job execution plan by recompiling the flow spec to send to the DagManager
FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
FlowId flowId = new FlowId().setFlowGroup(dagAction.getFlowGroup()).setFlowName(dagAction.getFlowName());
FlowSpec spec = null;
try {
URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId);
spec = (FlowSpec) flowCatalog.getSpecs(flowUri);
// Pass flowExecutionId to DagManager to be used for scheduled flows that do not already contain a flowExecutionId
this.orchestrator.submitFlowToDagManager(spec, Optional.of(flowExecutionId));
this.orchestrator.submitFlowToDagManager(spec, dagAction);
} catch (URISyntaxException e) {
log.warn("Could not create URI object for flowId {}. Exception {}", flowId, e.getMessage());
this.failedFlowLaunchSubmissions.mark();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ public class DagActionStoreChangeMonitorFactory implements Provider<DagActionSto
static final String DAG_ACTION_STORE_CHANGE_MONITOR_NUM_THREADS_KEY = "numThreads";

private final Config config;
private DagActionStore dagActionStore;
private DagManager dagManager;
private FlowCatalog flowCatalog;
private Orchestrator orchestrator;
Expand All @@ -53,7 +52,6 @@ public DagActionStoreChangeMonitorFactory(Config config, DagActionStore dagActio
FlowCatalog flowCatalog, Orchestrator orchestrator,
@Named(InjectionNames.MULTI_ACTIVE_SCHEDULER_ENABLED) boolean isMultiActiveSchedulerEnabled) {
this.config = Objects.requireNonNull(config);
this.dagActionStore = dagActionStore;
this.dagManager = dagManager;
this.flowCatalog = flowCatalog;
this.orchestrator = orchestrator;
Expand All @@ -68,8 +66,8 @@ private DagActionStoreChangeMonitor createDagActionStoreMonitor()
String topic = ""; // Pass empty string because we expect underlying client to dynamically determine the Kafka topic
int numThreads = ConfigUtils.getInt(dagActionStoreChangeConfig, DAG_ACTION_STORE_CHANGE_MONITOR_NUM_THREADS_KEY, 5);

return new DagActionStoreChangeMonitor(topic, dagActionStoreChangeConfig, this.dagActionStore, this.dagManager,
numThreads, flowCatalog, orchestrator, isMultiActiveSchedulerEnabled);
return new DagActionStoreChangeMonitor(topic, dagActionStoreChangeConfig, this.dagManager, numThreads, flowCatalog,
orchestrator, isMultiActiveSchedulerEnabled);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,10 @@ void testAddDeleteSpec() throws Exception {
.thenReturn(Collections.singletonList(flowExecutionId3));

// mock add spec
// for very first dag to be added, add dag action to store and check its deleted by the addDag call
dagManager.getDagActionStore().get().addDagAction("group0", "flow0", Long.toString(flowExecutionId1), DagActionStore.FlowActionType.LAUNCH);
dagManager.addDag(dag1, true, true);
Assert.assertFalse(dagManager.getDagActionStore().get().exists("group0", "flow0", Long.toString(flowExecutionId1), DagActionStore.FlowActionType.LAUNCH));
dagManager.addDag(dag2, true, true);
dagManager.addDag(dag3, true, true);

Expand Down