Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -103,19 +103,23 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
private Optional<FlowTriggerHandler> flowTriggerHandler;
@Getter
private final SharedFlowMetricsSingleton sharedFlowMetricsSingleton;
@Getter
private final Optional<DagActionStore> dagActionStore;

private final ClassAliasResolver<SpecCompiler> aliasResolver;

public Orchestrator(Config config, Optional<TopologyCatalog> topologyCatalog, Optional<DagManager> dagManager,
Optional<Logger> log, FlowStatusGenerator flowStatusGenerator, boolean instrumentationEnabled,
Optional<FlowTriggerHandler> flowTriggerHandler, SharedFlowMetricsSingleton sharedFlowMetricsSingleton) {
Optional<FlowTriggerHandler> flowTriggerHandler, SharedFlowMetricsSingleton sharedFlowMetricsSingleton,
Optional<DagActionStore> dagActionStore) {
_log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
this.aliasResolver = new ClassAliasResolver<>(SpecCompiler.class);
this.topologyCatalog = topologyCatalog;
this.dagManager = dagManager;
this.flowStatusGenerator = flowStatusGenerator;
this.flowTriggerHandler = flowTriggerHandler;
this.sharedFlowMetricsSingleton = sharedFlowMetricsSingleton;
this.dagActionStore = dagActionStore;
try {
String specCompilerClassName = ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_FLOWCOMPILER_CLASS;
if (config.hasPath(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY)) {
Expand Down Expand Up @@ -161,9 +165,9 @@ public Orchestrator(Config config, Optional<TopologyCatalog> topologyCatalog, Op
@Inject
public Orchestrator(Config config, FlowStatusGenerator flowStatusGenerator, Optional<TopologyCatalog> topologyCatalog,
Optional<DagManager> dagManager, Optional<Logger> log, Optional<FlowTriggerHandler> flowTriggerHandler,
SharedFlowMetricsSingleton sharedFlowMetricsSingleton) {
SharedFlowMetricsSingleton sharedFlowMetricsSingleton, Optional<DagActionStore> dagActionStore) {
this(config, topologyCatalog, dagManager, log, flowStatusGenerator, true, flowTriggerHandler,
sharedFlowMetricsSingleton);
sharedFlowMetricsSingleton, dagActionStore);
}


Expand Down Expand Up @@ -266,7 +270,7 @@ public void orchestrate(Spec spec, Properties jobProps, long triggerTimestampMil
return;
}

String flowExecutionId = flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD);
String flowExecutionId = TimingEventUtils.getFlowExecutionIdFromFlowMetadata(flowMetadata);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the motivation behind changing this function here? From my point of view we should be enforcing the existence of a flow execution ID here as a dag must have an associated flow execution ID, otherwise we should block progression as the dag is likely corrupt

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see, I changed this to abstract away the use of the key. I could update the function to not provide a default if one doesn't exist, always expecting existence of flow execution id or revert this. What do you think?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure in that case

DagActionStore.DagAction flowAction =
new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, DagActionStore.FlowActionType.LAUNCH);
flowTriggerHandler.get().handleTriggerEvent(jobProps, flowAction, triggerTimestampMillis, isReminderEvent);
Expand All @@ -292,7 +296,11 @@ public void orchestrate(Spec spec, Properties jobProps, long triggerTimestampMil

// Depending on if DagManager is present, handle execution
if (this.dagManager.isPresent()) {
submitFlowToDagManager((FlowSpec) spec, jobExecutionPlanDag);
DagActionStore.DagAction launchAction =
new DagActionStore.DagAction(flowGroup, flowName,
TimingEventUtils.getFlowExecutionIdFromFlowMetadata(flowMetadata),
DagActionStore.FlowActionType.LAUNCH);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we raise up the DagAction initialization on line 274 above the if block, so it can be shared by the else?

submitFlowToDagManager((FlowSpec) spec, jobExecutionPlanDag, launchAction);
} else {
// Schedule all compiled JobSpecs on their respective Executor
for (Dag.DagNode<JobExecutionPlan> dagNode : jobExecutionPlanDag.getNodes()) {
Expand Down Expand Up @@ -335,22 +343,25 @@ public void orchestrate(Spec spec, Properties jobProps, long triggerTimestampMil
Instrumented.updateTimer(this.flowOrchestrationTimer, System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
}

public void submitFlowToDagManager(FlowSpec flowSpec, Optional<String> optionalFlowExecutionId) throws IOException, InterruptedException {
public void submitFlowToDagManager(FlowSpec flowSpec, DagActionStore.DagAction flowAction) throws IOException, InterruptedException {
Optional<Dag<JobExecutionPlan>> optionalJobExecutionPlanDag =
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec, optionalFlowExecutionId);
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec,
Optional.of(flowAction.getFlowExecutionId()));
if (optionalJobExecutionPlanDag.isPresent()) {
submitFlowToDagManager(flowSpec, optionalJobExecutionPlanDag.get());
submitFlowToDagManager(flowSpec, optionalJobExecutionPlanDag.get(), flowAction);
} else {
_log.warn("Flow: {} submitted to dagManager failed to compile and produce a job execution plan dag", flowSpec);
Instrumented.markMeter(this.flowOrchestrationFailedMeter);
}
}

public void submitFlowToDagManager(FlowSpec flowSpec, Dag<JobExecutionPlan> jobExecutionPlanDag)
public void submitFlowToDagManager(FlowSpec flowSpec, Dag<JobExecutionPlan> jobExecutionPlanDag,
DagActionStore.DagAction launchAction)
throws IOException {
try {
//Send the dag to the DagManager.
// Send the dag to the DagManager and delete the action after persisting it to avoid redundant execution on start up
this.dagManager.get().addDag(jobExecutionPlanDag, true, true);
this.dagActionStore.get().deleteDagAction(launchAction);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if the DM hasn't successfully handled it (e.g. the service fails after line 364, but before the DM actions complete)? would the action/event be lost?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall I recognize we're in an interim phase of our multi-leader journey, where we have yet to integrate the DagMgr w/ the MultiActiveLeaseArbiter... this is only temporary so let's not over-engineer

I'm concerned however that this PR's approach presumes successful handling for non-durable/in-memory state. a safer pattern would be to remove the LAUNCH action from the DagAction store only after the DM has finished launching the flow execution.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If DagManager.addDag completes then we have persisted (code) the dag in MySQL DagStateStore already and its status and completion will be tracked by existing DagManager functionality via series of checkpoints once its submitted to executor or fails or next dag is called etc... We load all dags from state store upon startup and while DagManager is active it should keep track of it. In pre-multi-active DagMgr I don't believe we should proceed further necessarily. What do you think?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's great news. we're all set then! sorry, FSR I thought it merely added the DAG to a queue, and hadn't recalled that it synchronously persists the DAG.

} catch (Exception ex) {
String failureMessage = "Failed to add Job Execution Plan due to: " + ex.getMessage();
_log.warn("Orchestrator call - " + failureMessage, ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ static Map<String, String> getFlowMetadata(Config flowConfig) {
return metadata;
}

/**
* Retrieves a flowExecutionId from flowMetadata map and returns dummy value if one is not set
*/
public static String getFlowExecutionIdFromFlowMetadata(Map<String, String> flowMetadata) {
return flowMetadata.getOrDefault(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, "<<no flowExecutionId>>");
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like the abstraction! as far as where this should live... don't we already have another utils class with static methods for extracting the flow group and flow name?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class has methods relating to this map of FlowMetadata specifically. It uses a different key than the normal flow group/name/execution id keys from ConfigurationKeys. I'm keeping related functionality in one place here

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense. I now see that the DagManagerUtils is unfortunately only applicable to that specific context... not also here... :(


static Map<String, String> getJobMetadata(Map<String, String> flowMetadata, JobExecutionPlan jobExecutionPlan) {
Map<String, String> jobMetadata = Maps.newHashMap();
JobSpec jobSpec = jobExecutionPlan.getJobSpec();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.runtime.JobException;
import org.apache.gobblin.runtime.api.DagActionStore;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecCatalogListener;
Expand Down Expand Up @@ -115,6 +116,7 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
protected final Boolean warmStandbyEnabled;
protected final Optional<UserQuotaManager> quotaManager;
protected final Optional<FlowTriggerHandler> flowTriggerHandler;
protected final Optional<DagActionStore> dagActionStore;
@Getter
protected final Map<String, Spec> scheduledFlowSpecs;
@Getter
Expand Down Expand Up @@ -171,7 +173,7 @@ public GobblinServiceJobScheduler(@Named(InjectionNames.SERVICE_NAME) String ser
Optional<HelixManager> helixManager, Optional<FlowCatalog> flowCatalog, Optional<TopologyCatalog> topologyCatalog,
Orchestrator orchestrator, SchedulerService schedulerService, Optional<UserQuotaManager> quotaManager, Optional<Logger> log,
@Named(InjectionNames.WARM_STANDBY_ENABLED) boolean warmStandbyEnabled,
Optional<FlowTriggerHandler> flowTriggerHandler) throws Exception {
Optional<FlowTriggerHandler> flowTriggerHandler, Optional<DagActionStore> dagActionStore) throws Exception {
super(ConfigUtils.configToProperties(config), schedulerService);

_log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
Expand All @@ -188,6 +190,7 @@ public GobblinServiceJobScheduler(@Named(InjectionNames.SERVICE_NAME) String ser
this.warmStandbyEnabled = warmStandbyEnabled;
this.quotaManager = quotaManager;
this.flowTriggerHandler = flowTriggerHandler;
this.dagActionStore = dagActionStore;
// Check that these metrics do not exist before adding, mainly for testing purpose which creates multiple instances
// of the scheduler. If one metric exists, then the others should as well.
MetricFilter filter = MetricFilter.contains(RuntimeMetrics.GOBBLIN_JOB_SCHEDULER_GET_SPECS_DURING_STARTUP_PER_SPEC_RATE_NANOS);
Expand All @@ -210,12 +213,12 @@ public GobblinServiceJobScheduler(String serviceName, Config config, FlowStatusG
Optional<HelixManager> helixManager, Optional<FlowCatalog> flowCatalog, Optional<TopologyCatalog> topologyCatalog,
Optional<DagManager> dagManager, Optional<UserQuotaManager> quotaManager, SchedulerService schedulerService,
Optional<Logger> log, boolean warmStandbyEnabled, Optional <FlowTriggerHandler> flowTriggerHandler,
SharedFlowMetricsSingleton sharedFlowMetricsSingleton)
SharedFlowMetricsSingleton sharedFlowMetricsSingleton, Optional<DagActionStore> dagActionStore)
throws Exception {
this(serviceName, config, helixManager, flowCatalog, topologyCatalog,
new Orchestrator(config, flowStatusGenerator, topologyCatalog, dagManager, log, flowTriggerHandler,
sharedFlowMetricsSingleton),
schedulerService, quotaManager, log, warmStandbyEnabled, flowTriggerHandler);
sharedFlowMetricsSingleton, dagActionStore),
schedulerService, quotaManager, log, warmStandbyEnabled, flowTriggerHandler, dagActionStore);
}

public synchronized void setActive(boolean isActive) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.gobblin.service.monitoring;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
Expand Down Expand Up @@ -168,7 +167,7 @@ protected void processMessage(DecodeableKafkaRecord message) {
throw new RuntimeException(String.format("Received LAUNCH dagAction while not in multi-active scheduler "
+ "mode for flowAction: %s", dagAction));
}
submitFlowToDagManagerHelper(flowGroup, flowName, flowExecutionId);
submitFlowToDagManagerHelper(dagAction);
} else {
log.warn("Received unsupported dagAction {}. Expected to be a KILL, RESUME, or LAUNCH", dagActionType);
this.unexpectedErrors.mark();
Expand All @@ -195,15 +194,15 @@ protected void processMessage(DecodeableKafkaRecord message) {
dagActionsSeenCache.put(changeIdentifier, changeIdentifier);
}

protected void submitFlowToDagManagerHelper(String flowGroup, String flowName, String flowExecutionId) {
protected void submitFlowToDagManagerHelper(DagActionStore.DagAction dagAction) {
Comment on lines -198 to +195

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

excellent--much clearer method signature!

// Retrieve job execution plan by recompiling the flow spec to send to the DagManager
FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
FlowId flowId = new FlowId().setFlowGroup(dagAction.getFlowGroup()).setFlowName(dagAction.getFlowName());
FlowSpec spec = null;
try {
URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId);
spec = (FlowSpec) flowCatalog.getSpecs(flowUri);
// Pass flowExecutionId to DagManager to be used for scheduled flows that do not already contain a flowExecutionId
this.orchestrator.submitFlowToDagManager(spec, Optional.of(flowExecutionId));
this.orchestrator.submitFlowToDagManager(spec, dagAction);
} catch (URISyntaxException e) {
log.warn("Could not create URI object for flowId {}. Exception {}", flowId, e.getMessage());
this.failedFlowLaunchSubmissions.mark();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.runtime.api.DagActionStore;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecCatalogListener;
Expand Down Expand Up @@ -109,7 +110,7 @@ public void setup() throws Exception {
this.orchestrator = new Orchestrator(ConfigUtils.propertiesToConfig(orchestratorProperties),
this.mockStatusGenerator, Optional.of(this.topologyCatalog), Optional.<DagManager>absent(), Optional.of(logger),
Optional.of(this._mockFlowTriggerHandler), new SharedFlowMetricsSingleton(
ConfigUtils.propertiesToConfig(orchestratorProperties)));
ConfigUtils.propertiesToConfig(orchestratorProperties)), Optional.of(mock(DagActionStore.class)));
this.topologyCatalog.addListener(orchestrator);
this.flowCatalog.addListener(orchestrator);
// Start application
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Map;
import java.util.Properties;

import org.apache.gobblin.runtime.api.DagActionStore;
import org.mockito.Mockito;
import org.mockito.invocation.Invocation;
import org.mockito.stubbing.Answer;
Expand Down Expand Up @@ -355,7 +356,7 @@ public void testJobSchedulerAddFlowQuotaExceeded() throws Exception {
// Mock a GaaS scheduler not in warm standby mode
GobblinServiceJobScheduler scheduler = new GobblinServiceJobScheduler("testscheduler",
ConfigFactory.empty(), Optional.absent(), Optional.of(flowCatalog), null, mockOrchestrator, schedulerService, Optional.of(new InMemoryUserQuotaManager(quotaConfig)), Optional.absent(), false, Optional.of(Mockito.mock(
FlowTriggerHandler.class)));
FlowTriggerHandler.class)), Optional.of(Mockito.mock(DagActionStore.class)));

schedulerService.startAsync().awaitRunning();
scheduler.startUp();
Expand All @@ -374,7 +375,7 @@ public void testJobSchedulerAddFlowQuotaExceeded() throws Exception {
//Mock a GaaS scheduler in warm standby mode, where we don't check quota
GobblinServiceJobScheduler schedulerWithWarmStandbyEnabled = new GobblinServiceJobScheduler("testscheduler",
ConfigFactory.empty(), Optional.absent(), Optional.of(flowCatalog), null, mockOrchestrator, schedulerService, Optional.of(new InMemoryUserQuotaManager(quotaConfig)), Optional.absent(), true, Optional.of(Mockito.mock(
FlowTriggerHandler.class)));
FlowTriggerHandler.class)), Optional.of(Mockito.mock(DagActionStore.class)));

schedulerWithWarmStandbyEnabled.startUp();
schedulerWithWarmStandbyEnabled.setActive(true);
Expand All @@ -397,7 +398,7 @@ public TestGobblinServiceJobScheduler(String serviceName, Config config,
Optional<FlowCatalog> flowCatalog, Optional<TopologyCatalog> topologyCatalog, Orchestrator orchestrator, Optional<UserQuotaManager> quotaManager,
SchedulerService schedulerService, boolean isWarmStandbyEnabled) throws Exception {
super(serviceName, config, Optional.absent(), flowCatalog, topologyCatalog, orchestrator, schedulerService, quotaManager, Optional.absent(), isWarmStandbyEnabled, Optional.of(Mockito.mock(
FlowTriggerHandler.class)));
FlowTriggerHandler.class)), Optional.of(Mockito.mock(DagActionStore.class)));
if (schedulerService != null) {
hasScheduler = true;
}
Expand Down